What is the difference between hyper::body::BodyDataStream and Incoming for streaming request bodies?
Incoming is the concrete body type for HTTP request and response bodies in hyper's server implementation, while BodyDataStream provides a streaming interface over the incoming body chunks with additional utilities like timeout and size limit support. The Incoming type directly represents the body as received from the network, exposing a Stream of data frames. BodyDataStream wraps an Incoming body and provides a higher-level Stream<Item = Result<Bytes, Error>> interface that handles the details of collecting frames into byte chunks and managing stream termination.
Understanding Incoming Bodies
use hyper::body::Incoming;
use hyper::{Request, Response};
use http_body_util::BodyExt;
async fn handle_request(req: Request<Incoming>) -> Result<Response<String>, hyper::Error> {
// Incoming is the default body type for hyper server requests
// It represents the streaming body from the network
// Check if body has content
println!("Body size hint: {:?}", req.body().size_hint());
// Collect the entire body (consumes the stream)
let body_bytes = req.collect().await?.to_bytes();
let body_str = String::from_utf8_lossy(&body_bytes);
Ok(Response::new(format!("Received: {}", body_str)))
}
#[tokio::main]
async fn main() {
// Incoming is what hyper::server provides
// It's a stream of incoming data frames
}Incoming is the raw body type from hyper's server.
BodyDataStream for Controlled Streaming
use hyper::body::{BodyDataStream, Incoming};
use hyper::Request;
use http_body_util::BodyExt;
use tokio_stream::StreamExt;
async fn stream_body_example(req: Request<Incoming>) -> Result<String, hyper::Error> {
// BodyDataStream wraps Incoming for controlled streaming
let body: BodyDataStream = req.into_body().into_data_stream();
// Now we have a Stream<Item = Result<Bytes, Error>>
let mut stream = body;
let mut total_size = 0;
let mut chunks = Vec::new();
while let Some(chunk_result) = stream.next().await {
match chunk_result {
Ok(bytes) => {
total_size += bytes.len();
chunks.push(bytes);
}
Err(e) => {
eprintln!("Error receiving chunk: {}", e);
return Err(e);
}
}
}
Ok(format!("Received {} bytes in {} chunks", total_size, chunks.len()))
}BodyDataStream converts the body into a byte stream.
When to Use Each Type
use hyper::body::{BodyDataStream, Incoming};
use hyper::{Request, Response};
use http_body_util::BodyExt;
// Incoming: Direct body type from hyper server
// Use when:
// - You need to access body metadata
// - Using BodyExt trait methods (collect, to_bytes)
// - Passing body to another service
// BodyDataStream: Streaming interface
// Use when:
// - Processing chunks incrementally
// - Need Stream trait methods
// - Want more control over streaming behavior
async fn process_with_incoming(req: Request<Incoming>) -> Result<Response<String>, hyper::Error> {
// Incoming - use with BodyExt
let size_hint = req.body().size_hint();
println!("Expected size: {:?}", size_hint);
// Consume the body completely
let bytes = req.into_body().collect().await?.to_bytes();
Ok(Response::new(format!("Received {} bytes", bytes.len())))
}
async fn process_with_stream(req: Request<Incoming>) -> Result<Response<String>, hyper::Error> {
// BodyDataStream - use for streaming
let mut stream = req.into_body().into_data_stream();
let mut processed = 0;
// Process each chunk as it arrives
while let Some(result) = futures::StreamExt::next(&mut stream).await {
let chunk = result?;
processed += chunk.len();
println!("Processed chunk of {} bytes", chunk.len());
}
Ok(Response::new(format!("Total processed: {}", processed)))
}Choose Incoming for metadata and collection, BodyDataStream for chunked processing.
Streaming Large Files
use hyper::body::BodyDataStream;
use hyper::Request;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use futures::StreamExt;
async fn save_large_file(req: Request<hyper::body::Incoming>) -> Result<String, Box<dyn std::error::Error>> {
// For large uploads, we don't want to load everything into memory
// Use BodyDataStream to process incrementally
let mut stream = req.into_body().into_data_stream();
let mut file = File::create("uploaded_file.bin").await?;
let mut total = 0;
while let Some(result) = stream.next().await {
let chunk = result?;
file.write_all(&chunk).await?;
total += chunk.len();
// Could add progress reporting here
if total % 1_000_000 == 0 {
println!("Received {} MB", total / 1_000_000);
}
}
file.flush().await?;
Ok(format!("Saved {} bytes", total))
}Stream processing handles large bodies without excessive memory use.
Body Ext Trait Methods
use hyper::body::Incoming;
use hyper::Request;
use http_body_util::BodyExt;
async fn body_ext_methods(req: Request<Incoming>) -> Result<(), hyper::Error> {
let body = req.into_body();
// size_hint() - Get approximate size without consuming
let hint = body.size_hint();
println!("Lower bound: {}", hint.lower());
println!("Upper bound: {:?}", hint.upper());
// collect() - Consume and collect all frames
let collected = body.collect().await?;
let bytes = collected.to_bytes();
println!("Total bytes: {}", bytes.len());
// Note: These methods work on Incoming directly via BodyExt
// BodyDataStream is for when you need Stream trait specifically
Ok(())
}BodyExt provides utility methods for Incoming bodies.
Frame-Based vs Data Stream
use hyper::body::{BodyDataStream, Incoming};
use http_body::{Body, Frame};
use http_body_util::BodyExt;
async fn frame_vs_stream_example(req: Request<Incoming>) -> Result<(), Box<dyn std::error::Error>> {
let body = req.into_body();
// Incoming implements Body, which yields Frames
// Frames can be data or trailers
// Method 1: Work with frames directly
let mut frame_body = body;
// BodyDataStream gives us just the data (no trailers)
let data_stream: BodyDataStream = frame_body.into_data_stream();
// The difference:
// - Body/Incoming: yields Frame<Bytes> (data or trailers)
// - BodyDataStream: yields Bytes (just data)
Ok(())
}
// Example showing frame access
async fn process_frames(mut body: Incoming) -> Result<(), hyper::Error> {
use futures::StreamExt;
while let Some(frame_result) = body.frame().await {
match frame_result {
Ok(frame) => {
if frame.is_data() {
let data = frame.into_data().unwrap();
println!("Data frame: {} bytes", data.len());
} else if frame.is_trailers() {
let trailers = frame.into_trailers().unwrap();
println!("Trailers: {:?}", trailers);
}
}
Err(e) => {
eprintln!("Frame error: {}", e);
break;
}
}
}
Ok(())
}Body yields frames (data or trailers); BodyDataStream yields only data bytes.
Practical HTTP Handler Pattern
use hyper::body::Incoming;
use hyper::{Request, Response, Method, StatusCode};
use http_body_util::BodyExt;
use futures::StreamExt;
async fn handle_upload(req: Request<Incoming>) -> Result<Response<String>, hyper::Error> {
match req.method() {
&Method::POST => {
// For small uploads, collect directly
let bytes = req.into_body().collect().await?.to_bytes();
Ok(Response::new(format!("Uploaded {} bytes", bytes.len())))
}
&Method::GET => {
Ok(Response::new("Use POST to upload".to_string()))
}
_ => {
let mut response = Response::new("Method not allowed".to_string());
*response.status_mut() = StatusCode::METHOD_NOT_ALLOWED;
Ok(response)
}
}
}
// For streaming large uploads
async fn handle_large_upload(req: Request<Incoming>) -> Result<Response<String>, hyper::Error> {
let content_length = req.body().size_hint().upper().unwrap_or(0);
if content_length > 10_000_000 {
// Stream large bodies
let mut stream = req.into_body().into_data_stream();
let mut count = 0;
while let Some(result) = stream.next().await {
let chunk = result?;
count += chunk.len();
// Process chunk...
}
Ok(Response::new(format!("Streamed {} bytes", count)))
} else {
// Collect small bodies
let bytes = req.into_body().collect().await?.to_bytes();
Ok(Response::new(format!("Collected {} bytes", bytes.len())))
}
}Choose strategy based on expected body size.
Combining with Timeouts
use hyper::body::BodyDataStream;
use hyper::Request;
use tokio::time::{timeout, Duration};
use futures::StreamExt;
async fn stream_with_timeout(req: Request<hyper::body::Incoming>) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
let stream: BodyDataStream = req.into_body().into_data_stream();
// Add timeout to each chunk
let mut combined_data = Vec::new();
let mut stream = stream;
while let Some(result) = stream.next().await {
// Timeout for receiving next chunk
match timeout(Duration::from_secs(30), async { result }).await {
Ok(Ok(bytes)) => {
combined_data.extend_from_slice(&bytes);
}
Ok(Err(e)) => {
return Err(e.into());
}
Err(_) => {
return Err("Timeout waiting for data".into());
}
}
}
Ok(combined_data)
}Combine BodyDataStream with timeouts for robust streaming.
Memory Efficiency Comparison
use hyper::body::{BodyDataStream, Incoming};
use hyper::Request;
use http_body_util::BodyExt;
use futures::StreamExt;
async fn memory_comparison(req: Request<Incoming>) -> Result<(), hyper::Error> {
// Approach 1: Collect into Bytes (buffers entire body)
let bytes = req.into_body().collect().await?.to_bytes();
// Memory: O(n) where n is body size
println!("Collected {} bytes into memory", bytes.len());
// Approach 2: Stream with constant memory
let req2 = Request::builder()
.body(Incoming::empty())?;
// For demonstration, we'd need an actual incoming stream
// In real code, stream processes chunks without full buffering
// BodyDataStream allows processing with O(1) buffer per chunk
// rather than O(n) for the entire body
Ok(())
}
// Real streaming with bounded memory
async fn process_streaming(
req: Request<Incoming>,
) -> Result<(), Box<dyn std::error::Error>> {
let mut stream = req.into_body().into_data_stream();
// Process with bounded buffer
let mut buffer = [0u8; 8192];
let mut processed = 0;
while let Some(result) = stream.next().await {
let chunk = result?;
// Process chunk immediately, don't accumulate
for byte in chunk.iter() {
// Process byte by byte or in small batches
processed += 1;
}
// Memory stays bounded
}
println!("Processed {} bytes", processed);
Ok(())
}Streaming maintains bounded memory; collecting loads everything.
Response Body Handling
use hyper::body::Incoming;
use hyper::{Response, Body};
use http_body_util::BodyExt;
async fn handle_response_body(response: Response<Incoming>) -> Result<(), hyper::Error> {
// Incoming bodies work for responses too (e.g., from hyper client)
let status = response.status();
let headers = response.headers().clone();
// Check content-length for size hint
let hint = response.body().size_hint();
println!("Response size hint: {:?}", hint);
// Collect body
let body = response.into_body().collect().await?.to_bytes();
println!("Status: {}", status);
println!("Body length: {}", body.len());
Ok(())
}
// For streaming responses (e.g., SSE, large downloads)
async fn handle_streaming_response(response: Response<Incoming>) -> Result<(), hyper::Error> {
use futures::StreamExt;
let mut stream = response.into_body().into_data_stream();
// Process chunks as they arrive
while let Some(result) = stream.next().await {
let chunk = result?;
// Process chunk...
println!("Received {} bytes", chunk.len());
}
Ok(())
}Both request and response bodies use Incoming with the same patterns.
Synthesis
Quick reference:
use hyper::body::{BodyDataStream, Incoming};
use http_body_util::BodyExt;
use futures::StreamExt;
// Incoming: Raw body from hyper server
// - Implements Body trait (frames, size_hint)
// - Use with BodyExt::collect, to_bytes
// - Access to trailers and metadata
// - Default body type for hyper server
// BodyDataStream: Streaming interface over body data
// - Implements Stream<Item = Result<Bytes, Error>>
// - Use for incremental processing
// - Just data, no trailers
// - Create via incoming.into_data_stream()
// When to use Incoming directly:
// - Need size_hint()
// - Using BodyExt trait methods
// - Need access to trailers
// - Simple collect-and-process pattern
// When to use BodyDataStream:
// - Processing large bodies incrementally
// - Need Stream trait methods
// - Memory-constrained environments
// - Real-time data processing
// Example patterns:
// Collect entire body (simple, loads all into memory)
async fn collect_pattern(body: Incoming) -> Result<bytes::Bytes, hyper::Error> {
body.collect().await.map(|c| c.to_bytes())
}
// Stream chunks (complex, bounded memory)
async fn stream_pattern(body: Incoming) -> Result<usize, hyper::Error> {
let mut stream = body.into_data_stream();
let mut total = 0;
while let Some(result) = stream.next().await {
total += result?.len();
}
Ok(total)
}
// Frame-based (for trailers)
async fn frame_pattern(mut body: Incoming) -> Result<(), hyper::Error> {
use http_body::Body;
while let Some(frame) = body.frame().await {
let frame = frame?;
if frame.is_data() {
// Process data frame
} else if frame.is_trailers() {
// Process trailers
}
}
Ok(())
}Key insight: Incoming and BodyDataStream serve different abstraction levels for the same underlying data. Incoming is hyper's default body type—it implements the Body trait which yields frames (data or trailers) and provides metadata like size_hint(). BodyDataStream transforms this into a simpler Stream<Item = Result<Bytes, Error>> that yields only data bytes, filtering out trailers. Use Incoming directly when you need body metadata or the BodyExt convenience methods; use BodyDataStream when you need the Stream trait for incremental processing. The choice affects memory usage: collect().await?.to_bytes() loads the entire body into memory, while streaming through BodyDataStream processes one chunk at a time with bounded memory overhead.
