What is the difference between hyper::body::BodyDataStream and Incoming for streaming request bodies?

Incoming is hyper's built-in body type for incoming HTTP requests and responses, representing a stream of data frames that may arrive incrementally over the network. BodyDataStream (in hyper 1.x, this is Incoming itself when used as a stream, or specifically BodyDataStream as the stream type) represents the low-level stream interface for consuming body data chunk by chunk. The key distinction is that Incoming is the concrete body type hyper uses for incoming messages, while BodyDataStream is the stream implementation that yields Frame<Bytes> items. When you call .into_stream() on an Incoming body, you get a BodyDataStream that implements Stream<Item = Result<Frame<Bytes>, Error>>.

Understanding Incoming

use hyper::body::Incoming;
 
// Incoming is the body type for incoming HTTP messages
// It represents data that's arriving over the network
 
// In a hyper 1.x handler, you receive Incoming directly
async fn handle_request(
    req: hyper::Request<Incoming>,
) -> Result<hyper::Response<String>, hyper::Error> {
    // The request body is Incoming
    let body: Incoming = req.into_body();
    
    // Incoming is efficient - no copying until needed
    // It represents the network stream directly
    
    // You can convert to bytes
    let bytes = hyper::body::to_bytes(body).await?;
    println!("Received {} bytes", bytes.len());
    
    Ok(hyper::Response::new("OK".to_string()))
}

Incoming is hyper's default body type for received HTTP messages.

Incoming as a Stream

use hyper::body::{Incoming, Frame};
use futures::StreamExt;
 
async fn stream_body(body: Incoming) -> Result<(), Box<dyn std::error::Error>> {
    // Incoming implements Stream
    let mut stream = body;
    
    while let Some(frame) = stream.next().await {
        let frame = frame?;
        
        // Frame can be either data or trailers
        match frame.into_data() {
            Ok(data) => {
                println!("Received {} bytes", data.len());
                // Process data chunk
            }
            Err(frame) => {
                // Frame was trailers, not data
                if let Some(trailers) = frame.into_trailers() {
                    println!("Received trailers: {:?}", trailers);
                }
            }
        }
    }
    
    Ok(())
}

Incoming directly implements Stream, yielding Frame<Bytes> items.

BodyDataStream Type

use hyper::body::{BodyDataStream, Frame};
use futures::StreamExt;
 
// BodyDataStream is the stream type for body data
// It's what you get from Incoming::into_stream() or similar
 
async fn process_body_stream(
    stream: BodyDataStream,
) -> Result<(), Box<dyn std::error::Error>> {
    // BodyDataStream yields Result<Frame<Bytes>, Error>
    let mut stream = stream;
    
    while let Some(result) = stream.next().await {
        let frame = result?;
        
        if let Ok(data) = frame.into_data() {
            println!("Chunk: {} bytes", data.len());
        }
    }
    
    Ok(())
}
 
// BodyDataStream is often used interchangeably with Incoming
// since Incoming itself implements Stream

BodyDataStream is the explicit stream type for body data.

The Relationship Between Incoming and BodyDataStream

use hyper::body::{Incoming, BodyDataStream, Frame};
use http_body::Body;
 
// In hyper 1.x:
// - Incoming is the Body implementation
// - BodyDataStream is the stream representation
 
// Body trait provides the data method
async fn use_body_trait<B>(body: B) -> Result<(), Box<dyn std::error::Error>>
where
    B: http_body::Body,
{
    // The Body trait provides into_data method
    // that returns a stream of data frames
    
    // Incoming implements Body
    // When you iterate over Incoming, you get Frame<Bytes>
    
    Ok(())
}
 
// Key relationship:
// Incoming: Body type from hyper
// BodyDataStream: The stream of Frame<Bytes>
// Incoming implements Stream<Item = Result<Frame<Bytes>, Error>>

Incoming implements the Body trait and can be used as a stream directly.

Frame-Based Processing

use hyper::body::{Incoming, Frame};
use http_body::Body;
use futures::StreamExt;
 
async fn process_frames(body: Incoming) -> Result<(), Box<dyn std::error::Error>> {
    let mut body = body;
    
    // Body::poll_frame gives you frames (data or trailers)
    while let Some(frame) = body.frame().await {
        let frame = frame?;
        
        // Check frame type
        match frame.into_data() {
            Ok(data) => {
                // Data frame: process the bytes
                let content = String::from_utf8_lossy(&data);
                println!("Data: {}", content);
            }
            Err(frame) => {
                // Not data, check for trailers
                if let Some(trailers) = frame.into_trailers() {
                    println!("Trailers: {:?}", trailers);
                    // HTTP trailers come after body data
                }
            }
        }
    }
    
    Ok(())
}

Frame<Bytes> can contain either data chunks or HTTP trailers.

Consuming the Body

use hyper::body::{Incoming, Bytes};
use http_body::Body;
 
// Multiple ways to consume Incoming
 
// Method 1: Collect to bytes (loads entire body into memory)
async fn to_bytes_example(body: Incoming) -> Result<Vec<u8>, hyper::Error> {
    let bytes = hyper::body::to_bytes(body).await?;
    Ok(bytes.to_vec())
}
 
// Method 2: Stream chunk by chunk (memory efficient)
async fn stream_example(body: Incoming) -> Result<(), Box<dyn std::error::Error>> {
    use futures::StreamExt;
    
    let mut stream = body;
    let mut total = 0;
    
    while let Some(frame) = stream.next().await {
        let frame = frame?;
        if let Ok(data) = frame.into_data() {
            total += data.len();
        }
    }
    
    println!("Total bytes: {}", total);
    Ok(())
}
 
// Method 3: Use Body trait methods
async fn body_trait_example(body: Incoming) -> Result<(), Box<dyn std::error::Error>> {
    use http_body::Body;
    
    let mut body = body;
    
    // body.frame() returns the next frame
    while let Some(frame) = body.frame().await {
        let frame = frame?;
        if let Ok(data) = frame.into_data() {
            println!("Chunk size: {}", data.len());
        }
    }
    
    Ok(())
}

Choose streaming for large bodies, to_bytes for small ones.

Working with Frame

use hyper::body::{Incoming, Frame, Bytes};
use http_body::Body;
 
async fn frame_details(body: Incoming) -> Result<(), Box<dyn std::error::Error>> {
    let mut body = body;
    
    while let Some(frame) = body.frame().await {
        let frame: Frame<Bytes> = frame?;
        
        // Frame methods:
        // - into_data(): Returns Ok(Bytes) if data frame
        // - into_trailers(): Returns Option<HeaderMap> if trailers frame
        // - is_data(): Check if it's a data frame
        // - is_trailers(): Check if it's a trailers frame
        
        if frame.is_data() {
            let data = frame.into_data().unwrap();
            println!("Data frame: {} bytes", data.len());
        } else if frame.is_trailers() {
            let trailers = frame.into_trailers().unwrap();
            println!("Trailers frame");
            for (name, value) in trailers.iter() {
                println!("  {}: {:?}", name, value);
            }
        }
    }
    
    Ok(())
}

Frame<Bytes> encapsulates both data and trailer frames.

Integration with Axum

use axum::{
    extract::Body,
    http::StatusCode,
    response::IntoResponse,
};
use hyper::body::Incoming;
 
// In Axum, Body::into_inner() gives you Incoming
async fn axum_handler(body: Body) -> Result<impl IntoResponse, StatusCode> {
    // Body is Axum's wrapper around Incoming
    let incoming: Incoming = body.into_inner();
    
    // Now use Incoming methods
    let bytes = hyper::body::to_bytes(incoming)
        .await
        .map_err(|_| StatusCode::BAD_REQUEST)?;
    
    Ok(format!("Received {} bytes", bytes.len()))
}
 
// Or stream directly
async fn axum_stream(body: Body) -> Result<impl IntoResponse, StatusCode> {
    use futures::StreamExt;
    
    let incoming = body.into_inner();
    let mut stream = incoming;
    let mut total = 0;
    
    while let Some(frame) = stream.next().await {
        match frame {
            Ok(frame) => {
                if let Ok(data) = frame.into_data() {
                    total += data.len();
                }
            }
            Err(_) => return Err(StatusCode::BAD_REQUEST),
        }
    }
    
    Ok(format!("Total: {} bytes", total))
}

Axum wraps Incoming in its Body type.

Comparing Body Types

use hyper::body::{Incoming, Bytes};
use http_body::Body;
 
// hyper provides several body types:
 
// Incoming: Streaming body from network
// - Efficient, streams data as it arrives
// - No buffering by default
// - Used for incoming requests/responses
 
// Bytes: Full body in memory
// - Simple, owned bytes
// - Good for small payloads
// - Not streaming
 
// Full<T>: Body with known size
// - Contains complete data
// - Can be converted to/from Bytes
 
// Comparison:
// Incoming: Stream of frames, network-backed
// Bytes: Single contiguous buffer, owned
 
async fn compare_bodies(body: Incoming) -> Result<(), Box<dyn std::error::Error>> {
    // Incoming: streaming, memory efficient
    let size = estimate_body_size(&body).await?;
    
    if size < 1024 * 1024 {
        // Small body: collect to bytes
        let bytes = hyper::body::to_bytes(body).await?;
        process_small_body(&bytes);
    } else {
        // Large body: stream processing
        stream_process(body).await?;
    }
    
    Ok(())
}
 
async fn estimate_body_size(body: &Incoming) -> Result<Option<u64>, hyper::Error> {
    // Body::size_hint() gives estimated size if known
    Ok(body.size_hint().exact())
}
 
fn process_small_body(bytes: &Bytes) {
    println!("Small body: {} bytes", bytes.len());
}
 
async fn stream_process(mut body: Incoming) -> Result<(), Box<dyn std::error::Error>> {
    use futures::StreamExt;
    
    while let Some(frame) = body.next().await {
        let frame = frame?;
        if let Ok(data) = frame.into_data() {
            // Process each chunk
            println!("Chunk: {} bytes", data.len());
        }
    }
    
    Ok(())
}

Choose body type based on size and processing needs.

Size Hints

use hyper::body::Incoming;
use http_body::Body;
 
async fn check_size(body: &Incoming) {
    // Incoming provides size_hint via Body trait
    let hint = body.size_hint();
    
    // exact() returns Some(size) if Content-Length is known
    if let Some(exact_size) = hint.exact() {
        println!("Exact size: {} bytes", exact_size);
    } else {
        println!("Unknown size (chunked encoding)");
    }
    
    // lower() returns minimum size if known
    println!("Lower bound: {} bytes", hint.lower());
    
    // upper() returns maximum size if known
    // Returns None for unbounded streams
    println!("Upper bound: {:?}", hint.upper());
}
 
// Use size hint to decide strategy
async fn decide_strategy(body: Incoming) -> Result<(), Box<dyn std::error::Error>> {
    let hint = body.size_hint();
    
    match hint.exact() {
        Some(size) if size < 10_000_000 => {
            // Known, reasonable size: collect
            let bytes = hyper::body::to_bytes(body).await?;
            println!("Collected {} bytes", bytes.len());
        }
        Some(size) => {
            // Known but large: stream
            println!("Large body: {} bytes, streaming", size);
            stream_large_body(body).await?;
        }
        None => {
            // Unknown size: always stream
            println!("Unknown size, streaming");
            stream_large_body(body).await?;
        }
    }
    
    Ok(())
}
 
async fn stream_large_body(body: Incoming) -> Result<(), Box<dyn std::error::Error>> {
    use futures::StreamExt;
    let mut stream = body;
    let mut count = 0;
    
    while let Some(frame) = stream.next().await {
        let frame = frame?;
        if let Ok(data) = frame.into_data() {
            count += data.len();
        }
    }
    
    println!("Streamed {} total bytes", count);
    Ok(())
}

size_hint() helps decide between streaming and collecting.

Error Handling

use hyper::body::Incoming;
use futures::StreamExt;
 
async fn handle_errors(body: Incoming) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
    let mut stream = body;
    let mut buffer = Vec::new();
    
    while let Some(result) = stream.next().await {
        match result {
            Ok(frame) => {
                if let Ok(data) = frame.into_data() {
                    buffer.extend_from_slice(&data);
                }
            }
            Err(e) => {
                // hyper::Error represents network/protocol errors
                eprintln!("Stream error: {}", e);
                return Err(e.into());
            }
        }
    }
    
    Ok(buffer)
}
 
// Common errors:
// - Connection reset by peer
// - Timeout during transfer
// - Invalid chunked encoding
// - Trailers without valid headers

Handle hyper::Error for network-level failures during streaming.

Practical Example: File Upload

use hyper::body::Incoming;
use http_body::Body;
use tokio::io::AsyncWriteExt;
use std::path::Path;
 
async fn save_upload(
    body: Incoming,
    dest: &Path,
) -> Result<u64, Box<dyn std::error::Error>> {
    use tokio::fs::File;
    use tokio::io::BufWriter;
    
    let mut file = BufWriter::new(File::create(dest).await?);
    let mut body = body;
    let mut total = 0;
    
    // Stream directly to file without loading entire body in memory
    while let Some(frame) = body.frame().await {
        let frame = frame?;
        
        if let Ok(data) = frame.into_data() {
            file.write_all(&data).await?;
            total += data.len() as u64;
        }
    }
    
    file.flush().await?;
    Ok(total)
}
 
// This approach:
// - Never loads entire file in memory
// - Works for multi-gigabyte uploads
// - Starts writing immediately as data arrives

Stream large uploads directly to storage without buffering.

Synthesis

Quick reference:

use hyper::body::{Incoming, Frame};
use http_body::Body;
use futures::StreamExt;
 
// Incoming: hyper's body type for incoming HTTP messages
// - Implements Body trait
// - Implements Stream<Item = Result<Frame<Bytes>, Error>>
// - Streams data as it arrives from network
 
// Frame<Bytes>: A single frame from the body
// - Can be data (into_data() -> Ok(Bytes))
// - Can be trailers (into_trailers() -> Option<HeaderMap>)
 
// Consuming the body:
 
// Option 1: Collect all at once (small bodies)
let bytes = hyper::body::to_bytes(body).await?;
 
// Option 2: Stream frame by frame (large bodies)
let mut body = body;
while let Some(frame) = body.frame().await {
    let frame = frame?;
    if let Ok(data) = frame.into_data() {
        // Process data chunk
    }
}
 
// Option 3: Use Stream trait directly
let mut stream = body;
while let Some(result) = stream.next().await {
    let frame = result?;
    // Handle frame
}
 
// Key points:
// - Incoming IS the stream (BodyDataStream is the implementation detail)
// - Use streaming for large/unknown size bodies
// - Use to_bytes for small bodies with known size
// - Frame can be data or trailers
// - size_hint() helps decide streaming vs. collecting

Key insight: Incoming is hyper's primary body type for received HTTP messages, implementing both the Body trait and the Stream trait. The distinction between Incoming and BodyDataStream is largely implementation detail—Incoming itself can be iterated as a stream of Frame<Bytes> items. Each frame contains either data bytes or HTTP trailers. For small bodies with known sizes, hyper::body::to_bytes() is convenient. For large bodies or streaming processing, iterate Incoming directly using .frame() or the Stream trait. The frame-based approach is essential for handling HTTP trailers (headers that come after the body) and for memory-efficient processing of large uploads or responses. Always consider size_hint() to decide between buffering and streaming strategies.