What is the difference between futures::stream::StreamExt::ready_chunks and chunks for buffering stream items?

chunks buffers items until a fixed count is reached, waiting for the next chunk even if items are immediately available, while ready_chunks returns immediately available items as a chunk without waiting for the buffer to fill. This distinction affects latency and buffering behavior in stream processing pipelines.

Stream Chunking Basics

use futures::stream::{self, StreamExt};
 
async fn chunking_basics() {
    // Streams produce items one at a time
    let stream = stream::iter(vec
![1, 2, 3, 4, 5, 6]);
    
    // Chunking groups consecutive items into Vec
s
    // This is useful for batching operations
    let chunked = stream.chunks(3);
    
    // Process Vec<i32> instead of individual i32
    let result: Vec<Vec<i32>> = chunked.collect().await;
    // result = [[1, 2, 3], [4, 5, 6]]
}

Chunking transforms a stream of items into a stream of Vec batches.

The chunks Method

use futures::stream::{self, StreamExt};
 
async fn chunks_example() {
    let stream = stream::iter(vec
![1, 2, 3, 4, 5]);
    
    // chunks(n) buffers up to n items before emitting
    let chunked = stream.chunks(3);
    
    let chunks: Vec<Vec<i32>> = chunked.collect().await;
    
    // chunks = [[1, 2, 3], [4, 5]]
    // Note: Last chunk may be smaller
}

chunks(3) buffers items until 3 are collected, then emits a Vec.

The ready_chunks Method

use futures::stream::{self, StreamExt};
 
async fn ready_chunks_example() {
    let stream = stream::iter(vec
![1, 2, 3, 4, 5]);
    
    // ready_chunks(n) returns immediately available items
    let chunked = stream.ready_chunks(3);
    
    let chunks: Vec<Vec<i32>> = chunked.collect().await;
    
    // With an in-memory iterator, all items are "ready"
    // So ready_chunks behaves similarly to chunks here
    // chunks = [[1, 2, 3], [4, 5]]
}

ready_chunks(3) emits items that are immediately available without waiting.

Key Difference: Waiting Behavior

use futures::stream::{self, StreamExt};
use std::time::Duration;
use tokio::time::sleep;
 
async fn waiting_behavior() {
    // Simulate a stream with delays
    let stream = async_stream::stream! {
        yield 1;
        yield 2;
        sleep(Duration::from_millis(100)).await;
        yield 3;
        yield 4;
    };
    
    // chunks: Waits until buffer is full OR stream ends
    // If 3 items requested, waits for 3 even if 2 are ready
    
    // ready_chunks: Returns immediately available items
    // If items are ready, returns them without waiting
    
    // With chunks(3):
    // - Polls for items until 3 are buffered
    // - Emits when buffer is full
    
    // With ready_chunks(3):
    // - Polls for ready items
    // - Emits immediately when any items are ready
    // - Up to the max capacity
}

The key difference is how they handle items that require waiting.

Behavior with Async Sources

use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
 
async fn async_source_chunks() {
    // Stream that produces items at different rates
    let stream = async_stream::stream! {
        // Immediately available
        yield 1;
        yield 2;
        
        // Delay before next items
        sleep(Duration::from_millis(10)).await;
        yield 3;
        yield 4;
        yield 5;
        
        // Another delay
        sleep(Duration::from_millis(10)).await;
        yield 6;
    };
    
    // With chunks(3):
    // Poll 1: items 1, 2 available -> waits for 3rd -> emits [1, 2, 3]
    // Poll 2: items 4, 5 available -> waits for 6th -> emits [4, 5, 6]
    // Result: [[1, 2, 3], [4, 5, 6]]
    
    // With ready_chunks(3):
    // Poll 1: items 1, 2 immediately ready -> emits [1, 2]
    // Poll 2: item 3 ready -> emits [3] (or waits for more based on readiness)
    // Behavior depends on polling timing
}

With async sources, ready_chunks emits items as they become available.

Immediate Availability

use futures::stream::{self, StreamExt};
 
async fn immediate_availability() {
    // A poll-based stream shows the difference more clearly
    // When items are all immediately available (like iter):
    // Both methods produce similar results
    
    let stream = stream::iter(vec
![1, 2, 3, 4, 5, 6, 7, 8, 9]);
    
    // All items are immediately available
    // Both chunks and ready_chunks see the same data
    
    let chunked = stream.chunks(3);
    let chunks: Vec<Vec<i32>> = chunked.collect().await;
    // [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
    
    let stream2 = stream::iter(vec
![1, 2, 3, 4, 5, 6, 7, 8, 9]);
    let ready_chunked = stream2.ready_chunks(3);
    let ready: Vec<Vec<i32>> = ready_chunked.collect().await;
    // [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
}

When all items are immediately available, both methods produce similar results.

The Poll Semantics

use futures::stream::{self, Stream, StreamExt};
use std::pin::Pin;
use std::task::{Context, Poll};
 
async fn poll_semantics() {
    // chunks implementation (conceptual):
    // fn poll_next(&mut self, cx: &mut Context) -> Poll<Option<Vec<T>>> {
    //     loop {
    //         if buffer.len() == capacity {
    //             return Poll::Ready(Some(buffer.take()));
    //         }
    //         match inner.poll_next(cx) {
    //             Poll::Ready(Some(item)) => buffer.push(item),
    //             Poll::Ready(None) => return Poll::Ready(buffer.take()),
    //             Poll::Pending => {
    //                 if buffer.is_empty() {
    //                     return Poll::Pending;
    //                 } else {
    //                     // Wait for more items
    //                     return Poll::Pending; // or continue waiting
    //                 }
    //             }
    //         }
    //     }
    // }
    
    // ready_chunks implementation (conceptual):
    // fn poll_next(&mut self, cx: &mut Context) -> Poll<Option<Vec<T>>> {
    //     loop {
    //         if buffer.len() == capacity {
    //             return Poll::Ready(Some(buffer.take()));
    //         }
    //         match inner.poll_next(cx) {
    //             Poll::Ready(Some(item)) => buffer.push(item),
    //             Poll::Ready(None) => return Poll::Ready(buffer.take()),
    //             Poll::Pending => {
    //                 // Key difference: return what we have
    //                 if buffer.is_empty() {
    //                     return Poll::Pending;
    //                 } else {
    //                     return Poll::Ready(Some(buffer.take()));
    //                 }
    //             }
    //         }
    //     }
    // }
}

The key difference: when the inner stream returns Pending, chunks continues waiting while ready_chunks returns accumulated items.

Latency Comparison

use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration, instant};
 
async fn latency_comparison() {
    // Simulate a slow stream
    let stream = async_stream::stream! {
        yield 1;
        yield 2;
        // Delay simulates network/database wait
        sleep(Duration::from_millis(100)).await;
        yield 3;
    };
    
    let start = instant::Instant::now();
    
    // chunks(3): Waits for all 3 items
    // Total latency: ~100ms (must wait for slow item)
    
    // ready_chunks(3): Returns [1, 2] immediately
    // Then waits and returns [3]
    // First result latency: ~0ms
    // Second result latency: ~100ms
    
    // ready_chunks has lower latency for first results
}

ready_chunks provides lower latency by not waiting for slow items.

Throughput vs Latency Trade-off

use futures::stream::{self, StreamExt};
 
async fn throughput_vs_latency() {
    // chunks: Higher throughput, higher latency
    // - Batches are always full (except last)
    // - Fewer batch processing calls
    // - Must wait for slow items
    
    // ready_chunks: Lower latency, potentially lower throughput
    // - Batches may be partial
    // - More batch processing calls
    // - Doesn't wait for slow items
    
    // Use chunks when:
    // - You want maximum batch efficiency
    // - Processing overhead is significant
    // - Latency is acceptable
    
    // Use ready_chunks when:
    // - Low latency is critical
    // - Partial batches are acceptable
    // - Items arrive at variable rates
}

Choose based on whether batch efficiency or latency matters more.

Use Case: Batch Database Writes

use futures::stream::{self, StreamExt};
 
async fn batch_database_writes() {
    // Stream of records to insert
    let records = stream::iter(vec
![
        Record { id: 1, data: "a" },
        Record { id: 2, data: "b" },
        Record { id: 3, data: "c" },
        Record { id: 4, data: "d" },
        Record { id: 5, data: "e" },
    ]);
    
    // Using chunks for efficient batching
    // Ensures batches are as full as possible
    let batches = records.chunks(3);
    
    // Process batches
    batches.for_each(|batch| async move {
        // Each INSERT statement handles up to 3 rows
        // database.insert_batch(&batch).await;
        println!("Inserting batch: {:?}", batch);
    }).await;
    
    // Output:
    // Inserting batch: [Record { id: 1, ... }, Record { id: 2, ... }, Record { id: 3, ... }]
    // Inserting batch: [Record { id: 4, ... }, Record { id: 5, ... }]
}
 
#[derive(Debug)]
struct Record {
    id: i32,
    data: &'static str,
}

Use chunks when you want maximum batch size for efficiency.

Use Case: Responsive UI Updates

use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
 
async fn responsive_updates() {
    // Stream of UI events at variable rates
    let events = async_stream::stream! {
        yield Event::Click("button1");
        yield Event::Click("button2");
        // User thinks, then clicks again
        sleep(Duration::from_millis(500)).await;
        yield Event::Click("button3");
    };
    
    // Using ready_chunks for responsive updates
    // Process immediately available events without delay
    let batches = events.ready_chunks(10);
    
    batches.for_each(|batch| async move {
        // Process batch immediately
        // Don't wait for more events
        println!("Processing {} events", batch.len());
        for event in batch {
            println!("  Event: {:?}", event);
        }
    }).await;
    
    // First batch processed immediately
    // Second batch processed after user action
}
 
#[derive(Debug)]
enum Event {
    Click(&'static str),
}

Use ready_chunks when you want to process available items immediately.

Combining with Other Combinators

use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration, timeout};
 
async fn combined_combinators() {
    let stream = async_stream::stream! {
        yield 1;
        yield 2;
        sleep(Duration::from_millis(50)).await;
        yield 3;
        sleep(Duration::from_millis(50)).await;
        yield 4;
        yield 5;
    };
    
    // ready_chunks with timeout for bounded wait
    let result = timeout(Duration::from_millis(10), async {
        stream.ready_chunks(3).collect::<Vec<Vec<i32>>>().await
    }).await;
    
    // ready_chunks returns immediately available items
    // First poll: [1, 2] available immediately
    // Second poll after delay: [3]
    // Third poll: [4, 5]
}

ready_chunks works well with timeouts and other combinators.

Empty and Final Chunks

use futures::stream::{self, StreamExt};
 
async fn edge_cases() {
    // Empty stream
    let empty = stream::iter(Vec::<i32>::new());
    let chunks: Vec<Vec<i32>> = empty.chunks(3).collect().await;
    // chunks = []
    
    // Stream smaller than chunk size
    let small = stream::iter(vec
![1, 2]);
    let chunks: Vec<Vec<i32>> = small.chunks(3).collect().await;
    // chunks = [[1, 2]] - partial final chunk
    
    // Stream exactly chunk size
    let exact = stream::iter(vec
![1, 2, 3]);
    let chunks: Vec<Vec<i32>> = exact.chunks(3).collect().await;
    // chunks = [[1, 2, 3]]
    
    // Both chunks and ready_chunks handle these cases the same way
}

Both methods handle edge cases similarly.

Performance Characteristics

use futures::stream::{self, StreamExt};
 
async fn performance() {
    // chunks:
    // - Allocates Vec with capacity = chunk_size
    // - Waits until buffer is full
    // - Fewer allocations overall
    // - Lower processing overhead
    
    // ready_chunks:
    // - Also allocates Vec with capacity = chunk_size
    // - May return partial Vec
s
    // - More potential processing calls
    // - Lower latency per batch
    
    // For high-throughput, batch-oriented workloads:
    // chunks is more efficient
    
    // For latency-sensitive, variable-rate streams:
    // ready_chunks is more responsive
}

Choose based on whether throughput or latency is the priority.

Comparison Table

use futures::stream::StreamExt;
 
async fn comparison() {
    // β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    // β”‚ Aspect              β”‚ chunks              β”‚ ready_chunks             β”‚
    // β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
    // β”‚ Buffer fill         β”‚ Waits for capacity  β”‚ Returns on Pending       β”‚
    // β”‚ Latency             β”‚ Higher (waits)      β”‚ Lower (immediate)       β”‚
    // β”‚ Batch efficiency    β”‚ Higher (fuller)     β”‚ Lower (partial)         β”‚
    // β”‚ Pending handling    β”‚ Continues waiting    β”‚ Returns buffer          β”‚
    // β”‚ Processing calls    β”‚ Fewer               β”‚ More                    β”‚
    // β”‚ Best for            β”‚ Batch efficiency    β”‚ Low latency             β”‚
    // β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
}

Complete Example

use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
 
async fn complete_example() {
    println!("=== chunks example ===");
    
    // Simulated network stream with variable timing
    let stream = async_stream::stream! {
        println!("  Item 1 produced");
        yield 1;
        println!("  Item 2 produced");
        yield 2;
        println!("  Item 3 produced");
        yield 3;
        sleep(Duration::from_millis(50)).await;
        println!("  Item 4 produced (after delay)");
        yield 4;
        println!("  Item 5 produced");
        yield 5;
        println!("  Item 6 produced");
        yield 6;
    };
    
    // Using chunks(3)
    println!("Processing with chunks(3):");
    let mut chunk_stream = stream.chunks(3);
    while let Some(chunk) = chunk_stream.next().await {
        println!("  Received chunk: {:?}", chunk);
        // Batch processing happens here
    }
    // Output shows chunks waiting to fill buffer:
    // [1, 2, 3] - waits for 3rd item
    // [4, 5, 6] - waits for 6th item
    
    println!("\n=== ready_chunks example ===");
    
    let stream2 = async_stream::stream! {
        println!("  Item 1 produced");
        yield 1;
        println!("  Item 2 produced");
        yield 2;
        println!("  Item 3 produced");
        yield 3;
        sleep(Duration::from_millis(50)).await;
        println!("  Item 4 produced (after delay)");
        yield 4;
        println!("  Item 5 produced");
        yield 5;
        println!("  Item 6 produced");
        yield 6;
    };
    
    // Using ready_chunks(3)
    println!("Processing with ready_chunks(3):");
    let mut ready_stream = stream2.ready_chunks(3);
    while let Some(chunk) = ready_stream.next().await {
        println!("  Received chunk: {:?}", chunk);
    }
    // Output shows ready_chunks returning available items:
    // [1, 2, 3] - all 3 were immediately available
    // [4, 5, 6] - these came after the delay
}

Summary

use futures::stream::StreamExt;
 
async fn summary() {
    // β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    // β”‚ Method              β”‚ Behavior                    β”‚ Use Case            β”‚
    // β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
    // β”‚ chunks(n)           β”‚ Waits for n items or end    β”‚ Batch efficiency    β”‚
    // β”‚ ready_chunks(n)     β”‚ Returns on Pending          β”‚ Low latency         β”‚
    // β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
    
    // Key points:
    // 1. chunks waits until buffer is full (up to n items)
    // 2. ready_chunks returns when stream is Pending with available items
    // 3. Both return partial final chunks when stream ends
    // 4. chunks optimizes for throughput, ready_chunks for latency
    // 5. Use chunks for database batches, bulk processing
    // 6. Use ready_chunks for responsive UI, real-time processing
}

Key insight: The fundamental difference between chunks and ready_chunks is how they handle the Poll::Pending case. When the inner stream returns Pending, chunks continues waiting for more items to fill the buffer, prioritizing batch efficiency. ready_chunks returns the accumulated items immediately, prioritizing latency. For streams where all items are immediately available (like stream::iter), both methods behave similarly. The difference emerges with async sources where items arrive at different ratesβ€”ready_chunks processes available items without waiting for slow producers, while chunks ensures fuller batches at the cost of increased latency. Choose chunks for throughput-critical batch processing, and ready_chunks for latency-sensitive applications.