What is the purpose of hyper::body::BodyExt::collect for aggregating streaming body data?

BodyExt::collect aggregates an asynchronous HTTP body stream into a contiguous buffer, converting a streaming body into collected bytes that can be processed as a single unit. This provides a convenient way to buffer entire request or response bodies when streaming processing isn't necessary, at the cost of holding all data in memory.

The BodyExt Trait

use hyper::body::{Body, Bytes, BodyExt};
 
// BodyExt is an extension trait for Body types
pub trait BodyExt: Body {
    async fn collect(self) -> Result<Collected, Self::Error>;
    
    // ... other methods
}
 
// Collected is a buffer of accumulated bytes
pub struct Collected {
    // Internal representation of collected bytes
}

BodyExt extends the Body trait with combinators, including collect for buffering.

Basic collect Usage

use hyper::body::BodyExt;
use hyper::Response;
use hyper::body::Incoming;
 
async fn basic_collect(response: Response<Incoming>) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
    // Collect the streaming body into a buffer
    let collected = response.collect().await?;
    
    // Convert to bytes
    let bytes = collected.to_bytes();
    
    // Or convert to Vec<u8>
    let vec = bytes.to_vec();
    
    Ok(vec)
}

collect awaits the entire body stream and aggregates all chunks into a buffer.

How collect Works Internally

use hyper::body::{Body, BodyExt, Bytes};
 
// Conceptual implementation of collect
async fn collect_implementation<B: Body>(body: B) -> Result<Collected, B::Error> {
    let mut collected = Collected::new();
    
    // Use the frame iterator to get data frames
    mut while let Some(frame) = body.frame().await {
        let frame = frame?;
        if let Ok(data) = frame.into_data() {
            collected.extend(&data);
        }
        // Trailers are handled separately
    }
    
    Ok(collected)
}
 
// Collected internally maintains a buffer of Bytes chunks
pub struct Collected {
    chunks: Vec<Bytes>,
    total_len: usize,
}

collect streams all frames, accumulates data frames, and ignores trailers.

Collected Buffer Operations

use hyper::body::BodyExt;
use hyper::body::{Bytes, Collected};
 
async fn collected_operations() {
    let collected: Collected = /* ... */;
    
    // Convert to contiguous Bytes
    let bytes: Bytes = collected.to_bytes();
    
    // Iterate over chunks without copying
    for chunk in collected.iter() {
        println!("chunk: {:?}", chunk);
    }
    
    // Get total length
    let len: usize = collected.len();
    
    // Check if empty
    let is_empty: bool = collected.is_empty();
}

Collected provides methods to access aggregated data without unnecessary copies.

Converting to String

use hyper::body::BodyExt;
use hyper::Response;
use hyper::body::Incoming;
 
async fn body_to_string(response: Response<Incoming>) -> Result<String, Box<dyn std::error::Error>> {
    let bytes = response.collect().await?.to_bytes();
    
    // Convert bytes to string
    let text = String::from_utf8(bytes.to_vec())?;
    
    Ok(text)
}
 
// Alternative: use to_str on Bytes
async fn body_to_string_alt(response: Response<Incoming>) -> Result<String, Box<dyn std::error::Error>> {
    let bytes = response.collect().await?.to_bytes();
    
    // Bytes::to_str is faster when data is valid UTF-8
    let text = std::str::from_utf8(&bytes)?.to_string();
    
    Ok(text)
}

collect followed by UTF-8 conversion is a common pattern for JSON and text bodies.

Memory Implications

use hyper::body::BodyExt;
 
async fn memory_example() {
    // collect holds ALL body data in memory
    // For small bodies: fine
    // For large bodies: may exhaust memory
    
    // Small body (safe)
    let small: Bytes = small_request.collect().await?.to_bytes();
    
    // Large body (potentially problematic)
    let large: Bytes = large_video_request.collect().await?.to_bytes();
    // All video data now in heap memory
    
    // Alternative: use streaming for large bodies
    while let Some(frame) = large_request.body_mut().frame().await {
        // Process chunk by chunk
        let chunk = frame?.into_data()?;
        // Stream to disk, process incrementally
    }
}

collect buffers everything; use streaming for large bodies.

Comparison with Streaming Processing

use hyper::body::BodyExt;
use hyper::body::Incoming;
use tokio::io::AsyncWriteExt;
 
// Using collect: buffer entire body
async fn process_with_collect(
    body: Incoming,
) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
    let bytes = body.collect().await?.to_bytes();
    Ok(bytes.to_vec())
}
 
// Using streaming: process incrementally
async fn process_streaming(
    body: Incoming,
) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
    use futures::StreamExt;
    
    let mut output = Vec::new();
    let mut stream = body;
    
    while let Some(chunk) = stream.frame().await {
        let data = chunk?.into_data()?;
        output.extend_from_slice(&data);
    }
    
    Ok(output)
}
 
// Streaming to file: minimal memory
async fn stream_to_file(
    body: Incoming,
    mut file: tokio::fs::File,
) -> Result<(), Box<dyn std::error::Error>> {
    use futures::StreamExt;
    
    while let Some(chunk) = body.frame().await {
        let data = chunk?.into_data()?;
        file.write_all(&data).await?;
    }
    
    file.flush().await?;
    Ok(())
}

Streaming allows incremental processing; collect requires buffering everything.

JSON Body Processing

use hyper::body::BodyExt;
use hyper::Response;
use hyper::body::Incoming;
use serde::Deserialize;
 
#[derive(Deserialize)]
struct User {
    name: String,
    email: String,
}
 
async fn parse_json_body(
    response: Response<Incoming>,
) -> Result<User, Box<dyn std::error::Error>> {
    // Collect entire body first
    let bytes = response.collect().await?.to_bytes();
    
    // Parse JSON from collected bytes
    let user: User = serde_json::from_slice(&bytes)?;
    
    Ok(user)
}
 
// For large JSON: consider streaming deserializer
async fn parse_large_json(
    response: Response<Incoming>,
) -> Result<User, Box<dyn std::error::Error>> {
    // For small to medium JSON, collect is fine
    // For very large JSON, use serde_json::from_reader with async stream
    // Requires external crates like serde_json_path or custom implementation
    
    let bytes = response.collect().await?.to_bytes();
    let user: User = serde_json::from_slice(&bytes)?;
    Ok(user)
}

JSON parsing typically requires the full body; collect is appropriate here.

Handling Trailers

use hyper::body::{BodyExt, Frame};
use hyper::Response;
use hyper::body::Incoming;
 
async fn handle_with_trailers(
    response: Response<Incoming>,
) -> Result<(), Box<dyn std::error::Error>> {
    // collect() ignores trailers
    let collected = response.collect().await?;
    let bytes = collected.to_bytes();
    
    // If you need trailers, process frames manually:
    let (parts, body) = response.into_parts();
    let mut body = body;
    
    let mut data = Vec::new();
    let mut trailers = None;
    
    while let Some(frame) = body.frame().await {
        let frame = frame?;
        match frame {
            Frame::Data(bytes) => data.extend_from_slice(&bytes),
            Frame::Trailers(t) => trailers = Some(t),
            _ => {}
        }
    }
    
    // Now you have both data and trailers
    Ok(())
}

collect captures data frames but discards trailers; use frame iteration for trailers.

Integration with Response Types

use hyper::body::BodyExt;
use hyper::{Request, Response, body::Incoming};
 
async fn handle_request(
    req: Request<Incoming>,
) -> Result<Response<String>, Box<dyn std::error::Error>> {
    // Collect request body
    let body_bytes = req.collect().await?.to_bytes();
    let body_str = String::from_utf8(body_bytes.to_vec())?;
    
    // Process the collected body
    let response_body = format!("Received: {}", body_str);
    
    Ok(Response::new(response_body))
}
 
// Multiple body types support collect
async fn generic_body_handling<B>(body: B) -> Result<Bytes, B::Error>
where
    B: BodyExt + Body,
{
    body.collect().await.map(|c| c.to_bytes())
}

collect works with any body type implementing BodyExt.

Timeout Handling with collect

use hyper::body::BodyExt;
use hyper::body::Incoming;
use tokio::time::{timeout, Duration};
 
async fn collect_with_timeout(
    body: Incoming,
    duration: Duration,
) -> Result<hyper::body::Bytes, Box<dyn std::error::Error>> {
    // Add timeout to prevent hanging on slow/infinite streams
    let result = timeout(duration, body.collect()).await??;
    
    Ok(result.to_bytes())
}
 
async fn collect_with_timeout_alt(
    body: Incoming,
) -> Result<hyper::body::Bytes, Box<dyn std::error::Error>> {
    // Alternative: use tokio::select for cancellation
    let collect_future = body.collect();
    
    tokio::select! {
        result = collect_future => {
            Ok(result?.to_bytes())
        }
        _ = tokio::time::sleep(Duration::from_secs(30)) => {
            Err("timeout".into())
        }
    }
}

Add timeouts to collect to prevent indefinite blocking on slow streams.

Error Handling Patterns

use hyper::body::BodyExt;
use hyper::body::Incoming;
use hyper::Error;
 
async fn error_handling(body: Incoming) -> Result<Vec<u8>, String> {
    // collect returns Result<Collected, Body::Error>
    let collected = body.collect().await
        .map_err(|e| format!("Failed to collect body: {}", e))?;
    
    // to_bytes doesn't fail
    let bytes = collected.to_bytes();
    
    // Convert to vec
    Ok(bytes.to_vec())
}
 
// Handling different error types
async fn handle_body_error(body: Incoming) -> Result<hyper::body::Bytes, hyper::Error> {
    match body.collect().await {
        Ok(collected) => Ok(collected.to_bytes()),
        Err(e) => {
            // Body errors: IO errors, protocol errors, etc.
            eprintln!("Body collection failed: {}", e);
            Err(e)
        }
    }
}

collect returns a Result with the body's error type.

Aggregate Body Size Limiting

use hyper::body::BodyExt;
use hyper::body::Incoming;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::Stream;
 
// Custom body wrapper that limits size
struct LimitedBody<B> {
    inner: B,
    limit: usize,
    current: usize,
}
 
impl<B> BodyExt for LimitedBody<B> where B: BodyExt + Body {}
 
impl<B: Body> Body for LimitedBody<B> {
    type Data = B::Data;
    type Error = B::Error;
    
    fn poll_frame(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
        // Implement size checking during frame polling
        // This is conceptual; full implementation is more complex
        self.inner.poll_frame(cx)
    }
}
 
// Alternative: check size after collection
async fn collect_with_limit(
    body: Incoming,
    max_size: usize,
) -> Result<Vec<u8>, String> {
    let collected = body.collect().await
        .map_err(|e| e.to_string())?;
    
    if collected.len() > max_size {
        return Err(format!("Body too large: {} > {}", collected.len(), max_size));
    }
    
    Ok(collected.to_bytes().to_vec())
}

Apply size limits either during streaming or after collection.

Performance Characteristics

use hyper::body::BodyExt;
use hyper::body::Incoming;
 
async fn performance() {
    // collect() characteristics:
    
    // 1. Single allocation for contiguous buffer
    //    - to_bytes() may coalesce chunks into one allocation
    
    // 2. Minimal copies with Bytes
    //    - Bytes uses reference counting
    //    - Cloning is cheap
    
    // 3. Memory usage
    //    - Proportional to body size
    //    - No upper bound without limits
    
    // 4. Time complexity
    //    - O(n) for n bytes in body
    //    - Each frame is processed once
    
    // 5. Async behavior
    //    - Yields between frames
    //    - Non-blocking for network I/O
    
    let body: Incoming = /* ... */;
    let bytes = body.collect().await?.to_bytes();
    
    // bytes is contiguous, efficient for parsing
    // But holds all memory until dropped
}

collect is efficient for moderate-sized bodies; streaming is better for large data.

Using with hyper::Body (deprecated but common)

use hyper::body::BodyExt;
use hyper::Body;
 
// Note: hyper::Body is deprecated in favor of incoming::Incoming
// But the pattern is the same
 
async fn legacy_body_example(body: Body) -> Result<Vec<u8>, hyper::Error> {
    let bytes = hyper::body::to_bytes(body).await?;
    // Or with BodyExt:
    // let bytes = body.collect().await?.to_bytes();
    
    Ok(bytes.to_vec())
}
 
// Modern approach with Incoming
async fn modern_body_example(body: hyper::body::Incoming) -> Result<Vec<u8>, hyper::Error> {
    let bytes = body.collect().await?.to_bytes();
    Ok(bytes.to_vec())
}

Both to_bytes and collect aggregate bodies; collect is more flexible.

Comparison Table

fn comparison() {
    // β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    // β”‚ Method              β”‚ Use Case              β”‚ Memory          β”‚ Copy    β”‚
    // β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
    // β”‚ collect().to_bytes()β”‚ Need all data at onceβ”‚ O(body size)   β”‚ Minimal β”‚
    // β”‚ frame().await       β”‚ Stream processing    β”‚ O(chunk size)  β”‚ None    β”‚
    // β”‚ to_bytes()          β”‚ Legacy, all data      β”‚ O(body size)   β”‚ Varies   β”‚
    // β”‚ collect().iter()    β”‚ Access chunks         β”‚ O(body size)   β”‚ None    β”‚
    // β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
}

Complete Example: HTTP Client

use hyper::body::BodyExt;
use hyper::{body::Incoming, Response};
use serde::Deserialize;
 
#[derive(Debug, Deserialize)]
struct ApiResponse {
    status: String,
    data: Vec<String>,
}
 
async fn fetch_and_parse(
    response: Response<Incoming>,
) -> Result<ApiResponse, Box<dyn std::error::Error>> {
    // 1. Collect the streaming body
    let collected = response.collect().await?;
    
    // 2. Convert to bytes
    let bytes = collected.to_bytes();
    
    // 3. Parse JSON
    let api_response: ApiResponse = serde_json::from_slice(&bytes)?;
    
    Ok(api_response)
}
 
// With error handling and size limit
async fn fetch_and_parse_safe(
    response: Response<Incoming>,
    max_size: usize,
) -> Result<ApiResponse, Box<dyn std::error::Error>> {
    // Collect with size awareness
    let collected = response.collect().await?;
    
    // Check size
    if collected.len() > max_size {
        return Err(format!("Response too large: {} bytes", collected.len()).into());
    }
    
    let bytes = collected.to_bytes();
    let api_response: ApiResponse = serde_json::from_slice(&bytes)
        .map_err(|e| format!("JSON parse error: {}", e))?;
    
    Ok(api_response)
}
 
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Simulated response handling
    println!("Use collect() when you need the entire body at once.");
    println!("Use frame iteration for streaming large bodies.");
    Ok(())
}

When to Use collect vs. Streaming

use hyper::body::BodyExt;
use hyper::body::Incoming;
 
async fn decision_guide(body: Incoming) -> Result<(), Box<dyn std::error::Error>> {
    // USE COLLECT WHEN:
    // - Body is small to medium size
    // - You need all data at once (JSON parsing, string operations)
    // - Simplicity is more important than memory efficiency
    // - You need random access to the data
    
    // USE STREAMING WHEN:
    // - Body is large (files, video, large JSON)
    // - Memory is constrained
    // - You can process incrementally
    // - You need to stream to disk or another destination
    // - You need trailers
    
    // USE collect WITH LIMITS WHEN:
    // - Body size is unknown but potentially large
    // - You want to enforce maximum size
    // - You still need all data at once
    
    // Examples:
    
    // 1. API response JSON: collect is fine
    let json_bytes = body.collect().await?.to_bytes();
    
    // 2. File upload: stream to disk
    // while let Some(frame) = body.frame().await { /* write to file */ }
    
    // 3. Unknown size: collect with limit
    // if body.collect().await?.len() > MAX { return Err(...); }
    
    Ok(())
}

Summary

fn summary() {
    // β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    // β”‚ Aspect              β”‚ Behavior                                   β”‚
    // β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
    // β”‚ Purpose             β”‚ Aggregate streaming body into buffer       β”‚
    // β”‚ Output              β”‚ Collected type with to_bytes(), iter()    β”‚
    // β”‚ Memory usage        β”‚ Proportional to body size                  β”‚
    // β”‚ Trailers            β”‚ Discarded (use frame() to capture)        β”‚
    // β”‚ Error handling      β”‚ Returns Result with body's Error type      β”‚
    // β”‚ Performance         β”‚ O(n) time, O(n) memory                     β”‚
    // β”‚ Best for            β”‚ Small/medium bodies, JSON, random access  β”‚
    // β”‚ Avoid for           β”‚ Large files, video, memory-constrained    β”‚
    // β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
    
    // Key points:
    // 1. collect() buffers the entire body into memory
    // 2. Returns Collected with to_bytes() for contiguous access
    // 3. Simpler than manual frame iteration
    // 4. Holds all data in memory - not for large bodies
    // 5. Trailers are discarded during collection
    // 6. Works with any body type implementing BodyExt
    // 7. Returns Result, propagating body errors
    // 8. Use with timeouts for untrusted sources
    // 9. to_bytes() may coalesce chunks into one allocation
    // 10. Use streaming for large bodies or limited memory
}

Key insight: BodyExt::collect bridges the gap between HTTP's streaming model and applications that need complete body data. HTTP bodies arrive as a stream of frames (data chunks and optional trailers), but many operationsβ€”JSON parsing, string processing, cryptographic hashingβ€”require the complete body. collect accumulates all data frames into a Collected buffer, which can then be converted to contiguous Bytes or iterated chunk-by-chunk. The trade-off is memory: collect holds the entire body in heap memory, making it unsuitable for large files or memory-constrained environments. For streaming scenariosβ€”writing uploads to disk, processing video, handling unlimited-size requestsβ€”use frame() iteration instead. Use collect when you genuinely need all data at once and the body size is bounded; for everything else, stream. Always consider adding size limits or timeouts when collecting untrusted bodies to prevent resource exhaustion.