Loading pageā¦
Rust walkthroughs
Loading pageā¦
futures::stream::StreamExt::ready_chunks for batching ready items only?ready_chunks buffers items from a stream and yields them as a Vec when either the buffer reaches the specified capacity or the underlying stream returns Pending, meaning it batches items that are immediately available without waiting for the stream to produce more. Unlike chunks which always waits until the buffer is full before yielding, ready_chunks will return a partial batch when the stream isn't ready to produce more items immediately. This is useful when you want to batch items for efficiency but don't want to introduce artificial latency waiting for a full batch when items are being produced slowly or intermittently.
use futures::stream::{self, StreamExt};
async fn basic_usage() {
// Create a stream with items
let stream = stream::iter(vec
![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
// Batch items into chunks of up to 3
let mut chunked = stream.ready_chunks(3);
// ready_chunks yields Vec<T> instead of individual items
while let Some(chunk) = chunked.next().await {
println!("Chunk: {:?}", chunk);
}
// Output depends on stream readiness:
// Chunk: [1, 2, 3]
// Chunk: [4, 5, 6]
// Chunk: [7, 8, 9]
// Chunk: [10]
}ready_chunks collects items and yields them as vectors, with the last chunk potentially smaller than the capacity.
use futures::stream::{self, StreamExt};
async fn chunks_comparison() {
// chunks: waits until buffer is full or stream ends
let stream = stream::iter(vec
![1, 2, 3, 4, 5]);
let mut chunked = stream.chunks(3);
// chunks always yields full batches (except last)
// Even if items arrive slowly, it waits for 3
// ready_chunks: yields when stream isn't ready or buffer full
let stream2 = stream::iter(vec
![1, 2, 3, 4, 5]);
let mut ready_chunked = stream2.ready_chunks(3);
// ready_chunks yields when:
// 1. Buffer has exactly 3 items (full)
// 2. Stream returns Pending (not ready to produce more)
// 3. Stream ends
// Key difference with synchronous streams:
// Both will yield the same chunks because iter() is always ready
// The difference shows with async sources that may not be ready
}chunks always waits for a full batch; ready_chunks yields early if the stream isn't ready.
use futures::stream::{Stream, StreamExt};
use std::time::Duration;
use tokio::time::sleep;
async fn async_stream_behavior() {
// Simulated async stream with delays
async fn produce_items() -> impl Stream<Item = i32> {
stream::unfold(0, |mut count| async move {
if count >= 10 {
None
} else {
count += 1;
// Simulate variable readiness
if count % 3 == 0 {
// Every 3rd item has a delay (stream not ready)
sleep(Duration::from_millis(10)).await;
}
Some((count, count))
}
})
}
let stream = produce_items();
let mut chunked = stream.ready_chunks(3);
// With ready_chunks:
// - Items 1, 2 arrive immediately (stream ready)
// - Item 3 has delay, stream becomes "not ready"
// - ready_chunks yields [1, 2] before item 3
// - Then item 3 arrives, stream ready again
// - Items 3, 4, 5 (if ready) batch together
while let Some(chunk) = chunked.next().await {
println!("Chunk: {:?}", chunk);
}
}With async streams, ready_chunks yields partial batches when the stream temporarily can't produce items.
use futures::stream::{Stream, StreamExt};
use std::task::{Context, Poll};
use std::pin::Pin;
// Custom stream that sometimes returns Pending
struct IntermittentStream {
items: Vec<i32>,
position: usize,
call_count: usize,
}
impl Stream for IntermittentStream {
type Item = i32;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>
{
self.call_count += 1;
// Every 3rd call, return Pending to simulate slow source
if self.call_count % 3 == 0 && self.position < self.items.len() {
// Stream is not ready right now
cx.waker().wake_by_ref(); // Schedule immediate wake
return Poll::Pending;
}
if self.position < self.items.len() {
let item = self.items[self.position];
self.position += 1;
Poll::Ready(Some(item))
} else {
Poll::Ready(None)
}
}
}
async fn pending_behavior() {
let stream = IntermittentStream {
items: vec
![1, 2, 3, 4, 5, 6, 7, 8, 9],
position: 0,
call_count: 0,
};
let mut chunked = stream.ready_chunks(5);
// ready_chunks behavior:
// - Polls stream, gets items until Pending
// - When Pending received, yields buffered items
// - This means partial batches on Pending
while let Some(chunk) = chunked.next().await {
println!("Partial chunk: {:?}", chunk);
}
}When the stream returns Pending, ready_chunks yields whatever it has buffered.
use futures::stream::{Stream, StreamExt};
use tokio::sync::mpsc;
async fn variable_availability() {
let (tx, mut rx) = mpsc::channel::<i32>(100);
// Producer sends items at varying rates
tokio::spawn(async move {
// Burst of items
for i in 0..5 {
tx.send(i).await.unwrap();
}
// Pause (stream not ready during this time)
tokio::time::sleep(Duration::from_millis(10)).await;
// Another burst
for i in 5..10 {
tx.send(i).await.unwrap();
}
});
// Convert receiver to stream
let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
let mut chunked = stream.ready_chunks(10);
// With ready_chunks(10):
// - First burst: items 0-4 arrive quickly
// - Pause: stream returns Pending
// - ready_chunks yields [0, 1, 2, 3, 4] (partial batch)
// - Second burst: items 5-9 arrive
// - Stream ends
// - ready_chunks yields [5, 6, 7, 8, 9]
while let Some(chunk) = chunked.next().await {
println!("Batch: {:?}", chunk);
}
}ready_chunks handles bursts by yielding partial batches between bursts.
use futures::stream::{Stream, StreamExt};
struct Record {
id: u32,
data: String,
}
async fn batch_inserts(records: impl Stream<Item = Record>) {
// Batch records for efficient database insertion
// Don't wait for full batch if stream is slow
let mut chunked = records.ready_chunks(100);
while let Some(batch) = chunked.next().await {
// Insert batch into database
// Even if batch < 100, proceed because stream was slow
println!("Inserting {} records", batch.len());
// database.insert_batch(&batch).await;
// Benefit: If records arrive slowly (e.g., from network),
// we don't wait for 100 records before processing
// We process what's available when the source pauses
}
}
async fn database_example() {
let records = stream::iter(vec
![
Record { id: 1, data: "a".to_string() },
Record { id: 2, data: "b".to_string() },
// ... more records ...
]);
batch_inserts(records).await;
}Batch inserts without waiting for full batches when data arrives slowly.
use futures::stream::{Stream, StreamExt};
use tokio::net::TcpStream;
use tokio_util::codec::{Framed, LinesCodec};
async fn network_batching() {
// Read lines from TCP connection
// Batch them for processing without adding latency
// let socket = TcpStream::connect("example.com:8080").await.unwrap();
// let framed = Framed::new(socket, LinesCodec::new());
// Simulated stream instead
let messages = stream::iter(vec
![
"msg1".to_string(),
"msg2".to_string(),
"msg3".to_string(),
]);
let mut batched = messages.ready_chunks(10);
// ready_chunks will:
// - Collect up to 10 messages
// - Yield immediately if network pauses (Pending)
// - Not add artificial latency waiting for 10 messages
while let Some(batch) = batched.next().await {
println!("Processing batch of {} messages", batch.len());
for msg in batch {
// Process each message
println!(" - {}", msg);
}
}
}Network messages are batched without adding latency when traffic is bursty.
use futures::stream::{self, StreamExt};
async fn buffering_strategies() {
// Strategy 1: Process one at a time (no batching)
let stream = stream::iter(vec
![1, 2, 3, 4, 5]);
let mut single = stream;
while let Some(item) = single.next().await {
println!("Single: {}", item);
}
// Pros: Immediate processing
// Cons: High overhead per item
// Strategy 2: chunks (wait for full batch)
let stream = stream::iter(vec
![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
let mut chunked = stream.chunks(3);
while let Some(chunk) = chunked.next().await {
println!("Chunk: {:?}", chunk);
}
// Pros: Consistent batch sizes
// Cons: Waits for full batch even if stream is slow
// Strategy 3: ready_chunks (yield when not ready)
let stream = stream::iter(vec
![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
let mut ready_chunked = stream.ready_chunks(3);
while let Some(chunk) = ready_chunked.next().await {
println!("Ready chunk: {:?}", chunk);
}
// Pros: Batches when possible, doesn't wait when slow
// Cons: Variable batch sizes
}Choose based on whether you prioritize consistent batch sizes or low latency.
use futures::stream::{Stream, StreamExt};
async fn performance_characteristics() {
// ready_chunks allocates a Vec for each batch
// Vec capacity is the specified chunk size
// With fast, synchronous streams:
// - ready_chunks behaves like chunks
// - Both yield full batches (except last)
// - Similar performance
// With slow, async streams:
// - ready_chunks yields partial batches
// - Lower latency (doesn't wait for full batch)
// - More batches (smaller average size)
// - Higher total overhead (more Vec allocations)
// Memory usage:
// - Both buffer up to chunk_size items
// - ready_chunks may allocate more Vecs (smaller batches)
// Example: 1000 items, chunk size 100
// chunks: 10 Vecs of 100 items each
// ready_chunks (with pauses): Maybe 15-20 Vecs of varying sizes
}ready_chunks trades consistent batch sizes for lower latency on slow streams.
use futures::stream::{self, StreamExt};
async fn edge_cases() {
// Empty stream
let empty: stream::Empty<i32> = stream::empty();
let mut chunked = empty.ready_chunks(10);
assert_eq!(chunked.next().await, None);
// Single item
let single = stream::iter(vec
![42]);
let mut chunked = single.ready_chunks(10);
assert_eq!(chunked.next().await, Some(vec
![42]));
assert_eq!(chunked.next().await, None);
// Fewer items than chunk size
let few = stream::iter(vec
![1, 2, 3]);
let mut chunked = few.ready_chunks(10);
assert_eq!(chunked.next().await, Some(vec
![1, 2, 3]));
assert_eq!(chunked.next().await, None);
// Chunk size of 1
let stream = stream::iter(vec
![1, 2, 3]);
let mut chunked = stream.ready_chunks(1);
// Yields each item as single-element Vec
// Behaves like map(|x| vec
![x])
}Edge cases: empty streams yield nothing; small streams yield one partial batch.
use futures::stream::{Stream, StreamExt};
use std::pin::Pin;
use std::task::{Context, Poll};
// Conceptual implementation of ready_chunks
struct ReadyChunks<S> {
stream: S,
items: Vec<S::Item>,
capacity: usize,
}
impl<S: Stream + Unpin> Stream for ReadyChunks<S> {
type Item = Vec<S::Item>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>
{
// Try to fill the buffer
loop {
if self.items.len() >= self.capacity {
// Buffer full, yield it
let items = std::mem::take(&mut self.items);
return Poll::Ready(Some(items));
}
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Ready(Some(item)) => {
self.items.push(item);
// Continue polling while items are ready
}
Poll::Ready(None) => {
// Stream ended, yield remaining items
if self.items.is_empty() {
return Poll::Ready(None);
}
let items = std::mem::take(&mut self.items);
return Poll::Ready(Some(items));
}
Poll::Pending => {
// Stream not ready, yield what we have
if self.items.is_empty() {
return Poll::Pending;
}
let items = std::mem::take(&mut self.items);
return Poll::Ready(Some(items));
}
}
}
}
}
// Key insight: When stream returns Pending and buffer has items,
// ready_chunks yields the buffer immediatelyThe key behavior: yield buffered items when Pending is received.
use futures::stream::StreamExt;
async fn when_to_use() {
// Use ready_chunks when:
// 1. Items arrive in bursts with gaps
// (don't wait for full batch during gaps)
// 2. Low latency is important
// (process items as soon as stream pauses)
// 3. Processing is expensive, batching helps
// (but don't add artificial latency)
// 4. Stream may be slow or intermittent
// (yield what's available rather than waiting)
// Use chunks when:
// 1. Consistent batch sizes matter
// 2. Bounded latency is acceptable
// 3. Predictable memory usage is needed
// 4. Items arrive at consistent rate
}Choose ready_chunks for bursty streams where latency matters; chunks for consistent batching.
use futures::stream::{self, StreamExt};
async fn combined_with_other_combinators() {
let stream = stream::iter(vec
![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
// ready_chunks after map
let processed = stream.map(|x| x * 2)
.ready_chunks(3);
// ready_chunks before filter (filter after batching)
let stream = stream::iter(vec
![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
let filtered = stream.ready_chunks(3)
.map(|chunk| chunk.into_iter().filter(|x| x % 2 == 0).collect::<Vec<_>>());
// ready_chunks with timeout
// use futures::stream::StreamExt;
// let with_timeout = stream.ready_chunks(3)
// .take_until(tokio::time::sleep(Duration::from_secs(5)));
}ready_chunks integrates with other stream combinators.
use futures::stream::{Stream, StreamExt};
use std::time::Duration;
#[derive(Debug)]
struct Event {
id: u64,
event_type: String,
payload: String,
}
async fn event_processing() {
// Simulate event stream with variable rates
async fn event_stream() -> impl Stream<Item = Event> {
stream::unfold(0u64, |mut count| async move {
if count >= 20 {
None
} else {
// Simulate variable processing time
if count % 5 == 0 && count > 0 {
tokio::time::sleep(Duration::from_millis(5)).await;
}
count += 1;
Some((
Event {
id: count,
event_type: format!("type_{}", count % 3),
payload: format!("payload_{}", count),
},
count,
))
}
})
}
let stream = event_stream();
let mut batched = stream.ready_chunks(10);
// Process events in batches
// ready_chunks yields partial batches when stream pauses
while let Some(batch) = batched.next().await {
println!("Processing batch of {} events", batch.len());
// Group by event type
let mut grouped: std::collections::HashMap<String, Vec<&Event>> =
std::collections::HashMap::new();
for event in &batch {
grouped.entry(event.event_type.clone())
.or_insert_with(Vec::new)
.push(event);
}
// Process each group
for (event_type, events) in grouped {
println!(" {}: {} events", event_type, events.len());
}
}
}Event processing batches events for efficiency while yielding during pauses.
Key behavior summary:
// ready_chunks yields when:
// 1. Buffer reaches capacity (full batch)
// 2. Stream returns Pending (not ready)
// 3. Stream ends (remaining items)
// This differs from chunks which only yields when:
// 1. Buffer reaches capacity
// 2. Stream ends
// The Pending case is the key differencePerformance trade-offs:
// ready_chunks advantages:
// - Lower latency on slow/intermittent streams
// - Doesn't block waiting for full batches
// - Natural for bursty data sources
// ready_chunks disadvantages:
// - Variable batch sizes (unpredictable)
// - More Vec allocations (smaller batches)
// - Less efficient for consistently fast streams
// chunks advantages:
// - Consistent batch sizes
// - Predictable memory allocation
// - Better for consistently fast streams
// chunks disadvantages:
// - Higher latency on slow streams
// - Waits for full batch even during gapsKey insight: ready_chunks is designed for scenarios where batching is beneficial for efficiency but you don't want to introduce artificial latency waiting for full batches. When a stream temporarily can't produce more items (returns Pending), ready_chunks yields whatever it has buffered rather than waiting. This makes it ideal for bursty data sources like network connections, file I/O, or event streams where data arrives in groups with gaps. The trade-off is variable batch sizesāyou might get a full batch of 100 items, or you might get a partial batch of 10 items if the source pauses. For scenarios where consistent batch sizes are more important than low latency, use chunks instead.