Loading pageā¦
Rust walkthroughs
Loading pageā¦
hyper::body::BodyDataStream handle backpressure for streaming response bodies?hyper::body::BodyDataStream implements backpressure through Rust's futures::Stream trait semantics, where polling for the next chunk is an explicit action that only happens when the consumer is ready to process data. The stream does not push data independentlyāinstead, the consumer must call poll_next (or use async iteration with .next().await), which creates natural backpressure: if the consumer isn't polling, the producer isn't producing. This is fundamentally different from callback-based streaming where data arrives regardless of consumer readiness. BodyDataStream wraps the underlying HTTP/1.1 or HTTP/2 connection's flow control: TCP receive windows for HTTP/1.1 and HTTP/2 stream-level flow control for HTTP/2, meaning that when the consumer stops polling, the connection's buffers fill up, eventually causing the sender to pause transmission. The stream yields Bytes chunks as they become available, and the consumer's polling rate directly controls the data flow rate, creating end-to-end backpressure from the application layer down to the network layer.
use hyper::body::BodyDataStream;
use futures::StreamExt;
// Backpressure problem:
// - Producer (network) can send data faster than consumer can process
// - Without backpressure, data buffers in memory
// - Unbounded buffering leads to OOM
// With callback-style streaming (no backpressure):
// fn on_data(chunk: Bytes) {
// // Data arrives whether we're ready or not
// // Must buffer if we can't process immediately
// }
// With Stream-based backpressure:
// async fn process(stream: BodyDataStream) {
// // We control when data arrives by polling
// while let Some(chunk) = stream.next().await {
// // Only receive chunks when we ask for them
// process_chunk(chunk);
// }
// }Stream-based backpressure makes data pulling explicit, preventing unbounded buffering.
use hyper::body::BodyDataStream;
use futures::StreamExt;
async fn stream_response(mut stream: BodyDataStream) {
// BodyDataStream implements futures::Stream
// Each next().await pulls one chunk
while let Some(result) = stream.next().await {
match result {
Ok(bytes) => {
// We received a chunk
println!("Received {} bytes", bytes.len());
// Process it (could be slow)
process_chunk(&bytes).await;
}
Err(e) => {
eprintln!("Error receiving chunk: {}", e);
break;
}
}
}
// Stream ends when next() returns None
println!("Stream complete");
}
async fn process_chunk(bytes: &[u8]) {
// Simulate slow processing
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}Each next().await is an explicit pull, creating backpressure automatically.
use hyper::body::BodyDataStream;
use futures::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
async fn explain_backpressure(mut stream: BodyDataStream) {
// Behind the scenes, .next() calls poll_next()
// The consumer controls the pace:
// 1. Consumer calls stream.next().await
// 2. Runtime polls stream with poll_next()
// 3. Stream returns Poll::Ready(Some(chunk)) if data available
// 4. OR Poll::Pending if no data available yet
// 5. Consumer processes chunk, then calls next() again
// If consumer is slow:
// - Calls to next() are infrequent
// - Stream is polled less often
// - Connection buffers fill up
// - Sender sees full buffers, stops sending
// - Backpressure propagates upstream
// If consumer stops calling next():
// - Stream is never polled
// - Connection layer sees no reads
// - TCP window fills, sender pauses
while let Some(chunk) = stream.next().await.transpose() {
match chunk {
Ok(bytes) => println!("Got {} bytes", bytes.len()),
Err(e) => eprintln!("Error: {}", e),
}
}
}The polling model means no polling = no data flow = backpressure.
use hyper::body::BodyDataStream;
use futures::StreamExt;
// HTTP/1.1 backpressure works through TCP flow control:
// - TCP has receive window (typically 64KB-1MB)
// - When application doesn't read, receive window fills
// - When window is full, TCP advertises zero window
// - Sender must pause until window opens
async fn http1_backpressure(mut stream: BodyDataStream) {
// For HTTP/1.1, BodyDataStream is connected to TCP socket
// Read some data
if let Some(Ok(chunk)) = stream.next().await {
println!("Got {} bytes", chunk.len());
// If we don't call next() for a while:
// - TCP buffers fill up
// - Sender's write() calls start blocking
// - Or sender's send buffer fills
// Simulate slow processing
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
// Only after this delay do we poll again
if let Some(Ok(chunk)) = stream.next().await {
println!("Got {} more bytes", chunk.len());
}
}
// The 1-second delay causes:
// 1. TCP receive buffer to fill
// 2. Zero window advertised to sender
// 3. Sender pauses transmission
}HTTP/1.1 backpressure relies on TCP flow control windows.
use hyper::body::BodyDataStream;
use futures::StreamExt;
// HTTP/2 has stream-level flow control in addition to TCP flow control:
// - Each stream has its own flow control window
// - WINDOW_UPDATE frames advertise available buffer space
// - DATA frames can only be sent when window is open
async fn http2_flow_control(mut stream: BodyDataStream) {
// For HTTP/2, BodyDataStream is connected to an HTTP/2 stream
// The stream maintains a flow control window
// When we read data:
// 1. We receive DATA frames
// 2. Window decreases
// 3. We process the data
// 4. We call next() again
// 5. HTTP/2 layer sends WINDOW_UPDATE
// 6. Window opens, more data can arrive
while let Some(Ok(chunk)) = stream.next().await {
println!("Received {} bytes", chunk.len());
// The HTTP/2 connection tracks:
// - Connection-level window (all streams share)
// - Stream-level window (per stream)
// When we read, windows are consumed
// When window is exhausted, sender must wait
// Process chunk...
}
// HTTP/2 flow control advantages:
// - Per-stream backpressure (one slow stream doesn't block others)
// - More granular control than TCP
// - Sender can multiplex across streams
}HTTP/2 provides stream-level flow control for finer-grained backpressure.
use hyper::body::BodyDataStream;
use futures::StreamExt;
async fn buffer_behavior(mut stream: BodyDataStream) {
// The connection layer has buffers:
// - TCP socket buffer (OS managed)
// - Hyper's internal buffer (for framing)
// These buffers create "slack" in backpressure:
// - Sender can send until buffers are full
// - Only then does backpressure kick in
// The amount of buffered data depends on:
// - TCP receive buffer size (sysctl net.ipv4.tcp_rmem)
// - Hyper's BufList capacity
// - HTTP/2 SETTINGS_MAX_FRAME_SIZE
// Example: Large buffer size
// - More data buffered before backpressure
// - Higher memory usage
// - Less frequent pauses for sender
// Small buffer size:
// - Quicker backpressure propagation
// - Lower memory usage
// - More frequent pauses
// Consume the stream
while let Some(Ok(_)) = stream.next().await {
// Each iteration processes one chunk
// If we're slow, buffers fill up
}
}Network buffers create some slack before backpressure takes effect.
use hyper::body::BodyDataStream;
use futures::StreamExt;
use tokio::sync::mpsc;
// WRONG: No backpressure (unbounded channel)
async fn unbounded_stream(mut stream: BodyDataStream) {
let (tx, mut rx) = mpsc::unbounded_channel();
// This is dangerous:
// spawn(async move {
// while let Some(Ok(chunk)) = stream.next().await {
// tx.send(chunk).unwrap(); // No backpressure!
// }
// });
// If rx consumer is slow, tx keeps sending
// Messages pile up in unbounded channel
// Memory grows without bound
// This defeats the backpressure from BodyDataStream
}
// CORRECT: Bounded channel with backpressure
async fn bounded_stream(mut stream: BodyDataStream) {
let (tx, mut rx) = mpsc::channel(16);
// Bounded channel respects backpressure:
// tx.send().await waits when channel is full
// This propagates backpressure through the channel
// spawn(async move {
// while let Some(Ok(chunk)) = stream.next().await {
// // Waits if channel is full
// if tx.send(chunk).await.is_err() {
// break;
// }
// }
// });
// Consumer
while let Some(chunk) = rx.recv().await {
process_chunk(&chunk).await;
}
}Use bounded channels to preserve backpressure across async boundaries.
use hyper::body::BodyDataStream;
use futures::StreamExt;
async fn chunk_sizes(mut stream: BodyDataStream) {
// Chunk sizes are determined by:
// - HTTP/1.1: chunk boundaries in Transfer-Encoding: chunked
// - HTTP/2: DATA frame boundaries (max 16KB typically)
// - Internal hyper buffering
while let Some(Ok(chunk)) = stream.next().await {
println!("Chunk size: {} bytes", chunk.len());
// Chunk sizes vary:
// - May be small (1 byte) or large (MBs for HTTP/1.1)
// - HTTP/2 limits to SETTINGS_MAX_FRAME_SIZE
// For backpressure, chunk size affects:
// - Granularity of flow control
// - Frequency of poll_next calls
// Large chunks = fewer polls, more data per poll
// Small chunks = more polls, finer-grained control
}
}Chunk size affects the granularity of backpressure control.
use hyper::body::BodyDataStream;
use futures::StreamExt;
async fn process_with_backpressure(mut stream: BodyDataStream) {
// Example: Process data with rate limiting
let mut total_bytes = 0usize;
let start = std::time::Instant::now();
while let Some(Ok(chunk)) = stream.next().await {
total_bytes += chunk.len();
// Process the chunk
let processed = process_in_memory(&chunk);
// If processing is slow, the loop iteration is slow
// next().await is called less frequently
// Backpressure naturally propagates
println!(
"Processed {} bytes, total: {}, rate: {:.2} MB/s",
chunk.len(),
total_bytes,
total_bytes as f64 / start.elapsed().as_secs_f64() / 1_000_000.0
);
// The rate here is controlled by:
// 1. Network delivery rate
// 2. Our processing rate
// 3. Flow control windows
}
}
fn process_in_memory(bytes: &[u8]) -> Vec<u8> {
// Simulate some processing
bytes.to_vec()
}Processing speed directly affects data flow through backpressure.
use hyper::body::BodyDataStream;
use futures::StreamExt;
async fn stream_composition(mut stream: BodyDataStream) {
// BodyDataStream can be combined with other stream adapters
// that preserve backpressure
// Map each chunk (backpressure preserved)
let mapped = stream.map(|result| {
result.map(|bytes| {
// Transform chunk
bytes.iter().map(|&b| b.wrapping_add(1)).collect::<Vec<_>>()
})
});
// Buffer some chunks (bounded, preserves backpressure)
// let buffered = stream.buffer(16); // Not shown in basic API
// Throttle processing (adds artificial backpressure)
let throttled = async_stream::stream! {
let mut stream = stream;
while let Some(result) = stream.next().await {
yield result;
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
};
// All these preserve backpressure because:
// - Each pulls from upstream only when downstream asks
// - The chain ends at BodyDataStream's poll_next
}Stream combinators that pull preserve backpressure naturally.
use hyper::body::BodyDataStream;
use futures::StreamExt;
use std::time::Duration;
async fn with_timeout(mut stream: BodyDataStream) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
let mut data = Vec::new();
// Add timeout to detect stalled streams
// (Timeout is another form of backpressure - if sender is too slow)
loop {
match tokio::time::timeout(Duration::from_secs(30), stream.next()).await {
Ok(Some(Ok(chunk))) => {
data.extend_from_slice(&chunk);
}
Ok(Some(Err(e))) => {
// Stream error (connection issue)
return Err(Box::new(e));
}
Ok(None) => {
// Stream complete
return Ok(data);
}
Err(_) => {
// Timeout - sender stopped sending
return Err("Stream timeout".into());
}
}
}
}Timeouts add backpressure for stalled connections.
use hyper::body::BodyDataStream;
use futures::StreamExt;
async fn memory_conscious_streaming(mut stream: BodyDataStream) {
// BodyDataStream yields Bytes, which is reference-counted
// This allows zero-copy operations on chunk contents
while let Some(Ok(chunk)) = stream.next().await {
// chunk is Bytes - reference counted, may share allocation
// Don't accumulate without bound
// let mut accumulated = Vec::new();
// accumulated.extend_from_slice(&chunk); // BAD: unbounded growth
// Process and discard (or process in place)
process_and_discard(chunk);
// chunk is dropped here, memory released
}
}
fn process_and_discard(bytes: hyper::body::Bytes) {
// Process bytes without accumulating
// Maybe write to file, send elsewhere, or compute aggregate
println!("Processed {} bytes", bytes.len());
// bytes dropped at end of scope
}Process chunks incrementally to avoid memory accumulation.
use hyper::body::BodyDataStream;
use futures::stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use hyper::body::Bytes;
// Custom stream that respects backpressure
struct SlowStream {
source: BodyDataStream,
delay: std::time::Duration,
}
impl Stream for SlowStream {
type Item = Result<Bytes, hyper::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>
{
// This is called when downstream wants data
// We control when we actually poll the source
// If we want to add rate limiting:
// - Use a timer to delay polling
// - This propagates backpressure by polling less often
// For simplicity, just delegate to source
Pin::new(&mut self.source).poll_next(cx)
}
}
// Usage:
async fn use_custom_stream(stream: BodyDataStream) {
let slow_stream = SlowStream {
source: stream,
delay: std::time::Duration::from_millis(100),
};
// Backpressure still works because our poll_next
// only produces when we poll the source
}Custom streams must poll upstream only when downstream requests.
Backpressure propagation chain:
| Layer | Mechanism | Effect |
|-------|-----------|--------|
| Application | stream.next().await rate | Controls polling frequency |
| Stream | poll_next calls | Only called when polled |
| Hyper | Internal buffering | Buffers until polled |
| HTTP/2 | Stream window | WINDOW_UPDATE when read |
| HTTP/1.1 | TCP window | Zero window when buffer full |
| Network | Socket buffer | Fills when application doesn't read |
Backpressure characteristics:
| Protocol | Flow Control | Granularity | |----------|-------------|-------------| | HTTP/1.1 | TCP window | Per connection | | HTTP/2 | Stream window | Per stream | | WebSocket | TCP + frame | Per connection | | gRPC | HTTP/2 stream | Per RPC |
Key insight: BodyDataStream implements backpressure through the fundamental semantics of Rust's Stream trait: data is pulled, not pushed. The consumer must explicitly call poll_next (via .next().await or iteration) to receive data, and this explicitness creates natural backpressureāif you're not asking for data, you're not getting it. This is deeply integrated with both HTTP/1.1 and HTTP/2: in HTTP/1.1, not reading from the socket fills the TCP receive window, causing the sender's writes to block; in HTTP/2, reading sends WINDOW_UPDATE frames that advertise available buffer space, and not reading means no updates, causing the sender's window to exhaust. The practical implication is that streaming in Rust with BodyDataStream is safe by default: you can't accidentally overwhelm your application because data only arrives when you ask for it. However, this guarantee can be broken by introducing unbounded buffering between the stream and your processingāusing mpsc::unbounded_channel, collecting into a Vec, or spawning a task that reads without backpressure. The correct pattern is to maintain the pull-based semantics throughout your pipeline: use bounded channels for inter-task communication, process chunks incrementally rather than accumulating, and let the natural backpressure flow from your processing speed all the way back to the network layer.