What is the purpose of hyper::body::aggregate for efficiently collecting body chunks into a single buffer?

hyper::body::aggregate collects a streaming HTTP body into a single contiguous buffer, concatenating all chunks without unnecessary copies. Unlike to_bytes which also collects but may allocate a new buffer, aggregate returns an AggregatedBytes type that maintains references to the original chunk allocations where possible. This matters for performance when processing complete bodies that arrive as multiple frames—the function handles the complexity of chunk concatenation while minimizing memory overhead.

The Problem: Bodies Arrive as Chunks

use hyper::body::Body;
use http::Request;
 
// HTTP bodies in hyper are streams of "frames" containing data chunks
// The chunks arrive as they're received from the network
 
// A body might conceptually be one piece of data:
// "Hello, World! This is a long message..."
 
// But arrives as multiple chunks:
// Chunk 1: "Hello, World! "
// Chunk 2: "This is a "
// Chunk 3: "long message..."
 
// Working with multiple chunks is cumbersome:
// - Need to iterate over all chunks
// - Can't take a single slice reference
// - Need to copy if you need contiguous data

HTTP bodies are naturally chunked; aggregate handles collecting them efficiently.

Basic Usage of aggregate

use hyper::body::aggregate;
use http::Request;
use bytes::Buf;
 
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Simulate a request with a body that arrives in chunks
    let request = Request::builder()
        .method("POST")
        .uri("/upload")
        .body(hyper::Body::from("Hello, World!"))?;
    
    // aggregate collects all chunks into a single buffer
    let aggregated = aggregate(request.into_body()).await?;
    
    // Now we have a single contiguous buffer
    // aggregated is impl Buf (can read bytes from it)
    
    println!("Body length: {}", aggregated.remaining());
    
    // Convert to bytes for full access
    let bytes = hyper::body::to_bytes(&mut aggregated.clone()).await?;
    println!("Body content: {}", String::from_utf8_lossy(&bytes));
    
    Ok(())
}
 
// The key benefit: all chunks are now in one place
// You can take slices, iterate once, pass to parsers

aggregate takes ownership of the body and returns aggregated bytes.

AggregatedBytes vs Bytes

use hyper::body::aggregate;
use bytes::{Buf, Bytes};
 
#[tokio::main]
async fn example() -> Result<(), Box<dyn std::error::Error>> {
    // Create a body with multiple chunks
    let body = hyper::Body::from("Hello, World!");
    
    // aggregate returns AggregatedBytes
    let aggregated = aggregate(body).await?;
    
    // AggregatedBytes implements Buf
    // - Can iterate over bytes
    // - Can read chunks
    // - Can get remaining bytes
    
    // But AggregatedBytes is NOT Bytes
    // It's an internal type that may reference multiple allocations
    
    // Convert to Bytes if needed (may copy)
    let bytes = aggregated.copy_to_bytes(aggregated.remaining());
    
    // Bytes is a single contiguous buffer
    // - Can slice efficiently
    // - Can pass around easily
    // - Cheap to clone (reference counted)
    
    Ok(())
}
 
// The difference:
// - AggregatedBytes: result of aggregate(), may have internal chunks
// - Bytes: single contiguous buffer, can slice/clone cheaply
 
// Use AggregatedBytes when:
// - You need to process the body once
// - You're passing to something that takes impl Buf
// - Memory efficiency is critical
 
// Convert to Bytes when:
// - You need to slice
// - You need to keep the data around
// - You need reference-counted sharing

AggregatedBytes is optimized for the aggregation result; Bytes is for general use.

Comparison with to_bytes

use hyper::body::{aggregate, to_bytes};
use bytes::Buf;
 
#[tokio::main]
async fn compare() -> Result<(), Box<dyn std::error::Error>> {
    let body = hyper::Body::from("Hello, World!");
    
    // to_bytes: simpler, always returns Bytes
    let bytes = to_bytes(body).await?;
    // bytes is Bytes - single allocation
    
    let body2 = hyper::Body::from("Hello, World!");
    
    // aggregate: may avoid allocation
    let aggregated = aggregate(body2).await?;
    // aggregated may reference original chunks directly
    
    // When does aggregate avoid allocation?
    // - When body arrives as single chunk
    // - When chunks can be referenced without copy
    
    // When does aggregate still copy?
    // - When chunks are scattered in memory
    // - When converting to Bytes explicitly
    
    // to_bytes always:
    // - Collects all chunks
    // - May allocate new buffer for contiguous storage
    // - Returns Bytes (reference counted)
    
    // Use to_bytes when:
    // - You want the simpler API
    // - You need Bytes specifically
    // - Single allocation doesn't matter
    
    // Use aggregate when:
    // - You want to potentially avoid allocation
    // - You're passing to Buf consumer
    // - You're processing once and discarding
    
    Ok(())
}

to_bytes is simpler but always allocates; aggregate may avoid allocation.

Processing with Buf Trait

use hyper::body::aggregate;
use bytes::Buf;
use std::io::Cursor;
 
#[tokio::main]
async fn process_with_buf() -> Result<(), Box<dyn std::error::Error>> {
    let body = hyper::Body::from("Hello, World!");
    
    let mut aggregated = aggregate(body).await?;
    
    // AggregatedBytes implements Buf
    // You can use all Buf methods:
    
    // Get remaining length
    println!("Remaining bytes: {}", aggregated.remaining());
    
    // Read bytes into a slice
    let mut buf = [0u8; 5];
    aggregated.copy_to_slice(&mut buf);
    println!("First 5 bytes: {:?}", String::from_utf8_lossy(&buf));
    
    // Get a byte at a position
    // Note: get() works on the underlying bytes
    
    // Advance through the buffer
    aggregated.advance(1); // Skip one byte
    
    // Get remaining as contiguous bytes (may copy)
    if aggregated.has_remaining() {
        let remaining = aggregated.copy_to_bytes(aggregated.remaining());
        println!("Remaining: {:?}", String::from_utf8_lossy(&remaining));
    }
    
    Ok(())
}

AggregatedBytes implements Buf, providing rich byte manipulation methods.

Avoiding Unnecessary Copies

use hyper::body::aggregate;
use bytes::Buf;
 
#[tokio::main]
async fn efficient_processing() -> Result<(), Box<dyn std::error::Error>> {
    // When body arrives as single chunk:
    let single_chunk = hyper::Body::from("Single chunk body");
    let aggregated = aggregate(single_chunk).await?;
    
    // aggregate may just return reference to original allocation
    // No copy needed - just wrapping the existing bytes
    
    // When body has multiple chunks:
    let (sender, body) = hyper::Body::channel();
    
    // Simulate sending chunks
    sender.send_data(Bytes::from("Chunk 1")).await?;
    sender.send_data(Bytes::from("Chunk 2")).await?;
    sender.send_data(Bytes::from("Chunk 3")).await?;
    drop(sender); // Close sender
    
    let aggregated = aggregate(body).await?;
    
    // Now we have all chunks in one place
    // May have needed to copy to make contiguous
    // Or may reference multiple allocations internally
    
    // Key insight: aggregate collects efficiently
    // It doesn't necessarily make one big allocation
    // It creates an "aggregated view" over chunks
    
    // When you need truly contiguous:
    let contiguous = aggregated.copy_to_bytes(aggregated.remaining());
    // This may allocate and copy
    
    Ok(())
}
 
// Memory behavior:
// 1. Single chunk body -> aggregate references original, no copy
// 2. Multi-chunk body -> aggregate may reference multiple or copy
// 3. copy_to_bytes -> always provides contiguous, may copy

aggregate minimizes copies; use it when you don't strictly need Bytes.

Parsing Aggregated Bodies

use hyper::body::aggregate;
use bytes::Buf;
use serde::Deserialize;
 
#[derive(Deserialize)]
struct UserData {
    name: String,
    email: String,
}
 
#[tokio::main]
async fn parse_json() -> Result<(), Box<dyn std::error::Error>> {
    let json = r#"{"name":"Alice","email":"alice@example.com"}"#;
    let body = hyper::Body::from(json);
    
    // Aggregate the body
    let aggregated = aggregate(body).await?;
    
    // For JSON parsing, we need contiguous bytes
    // serde_json::from_reader would work too
    
    // Option 1: copy_to_bytes for contiguous slice
    let bytes = aggregated.copy_to_bytes(aggregated.remaining());
    let user: UserData = serde_json::from_slice(&bytes)?;
    println!("User: {} ({})", user.name, user.email);
    
    // Note: for JSON, you might use from_reader directly:
    // serde_json::from_reader(aggregated.reader())
    // This avoids copying for large bodies
    
    Ok(())
}
 
// Alternative: use reader() for streaming parsers
fn process_with_reader(aggregated: impl Buf) {
    use std::io::Read;
    
    let mut reader = aggregated.reader();
    let mut buffer = String::new();
    reader.read_to_string(&mut buffer).unwrap();
    println!("Read: {}", buffer);
}

aggregate works well with parsers that accept Buf or readers.

Handling Large Bodies

use hyper::body::aggregate;
use bytes::Buf;
 
#[tokio::main]
async fn handle_large_body() -> Result<(), Box<dyn std::error::Error>> {
    // For very large bodies, aggregate still loads everything into memory
    // This is the trade-off: convenience vs memory
    
    // DON'T use aggregate for:
    // - Streaming uploads (video, large files)
    // - Bodies larger than available memory
    // - When you want to process incrementally
    
    // DO use aggregate for:
    // - Small to medium bodies
    // - When you need the whole body for parsing
    // - When memory isn't a concern
    
    // Alternative for large bodies: stream processing
    use futures::stream::StreamExt;
    
    let (sender, body) = hyper::Body::channel();
    
    // Stream processing (doesn't load everything)
    let stream = body.into_stream();
    futures::pin_mut!(stream);
    
    while let Some(chunk) = stream.next().await {
        let chunk = chunk?;
        // Process each chunk as it arrives
        println!("Got chunk of {} bytes", chunk.remaining());
    }
    
    // This approach:
    // - Uses constant memory
    // - Processes incrementally
    // - Can't access all data at once
    
    Ok(())
}

aggregate loads everything; use streaming for large bodies.

Working with Request and Response Bodies

use hyper::{body::aggregate, Request, Response, Body};
use bytes::Buf;
 
#[tokio::main]
async fn handle_request() -> Result<(), Box<dyn std::error::Error>> {
    // Common pattern: aggregate request body for processing
    
    let request = Request::builder()
        .method("POST")
        .uri("/api/data")
        .header("content-type", "application/json")
        .body(Body::from(r#"{"action":"create","id":42}"#))?;
    
    // Take the body
    let body = request.into_body();
    
    // Aggregate it
    let aggregated = aggregate(body).await?;
    
    // Process the content
    let content = aggregated.copy_to_bytes(aggregated.remaining());
    println!("Request body: {:?}", String::from_utf8_lossy(&content));
    
    // Similarly for responses
    let response = Response::builder()
        .status(200)
        .body(Body::from("Success!"))?;
    
    let response_body = aggregate(response.into_body()).await?;
    println!("Response: {:?}", String::from_utf8_lossy(
        &response_body.copy_to_bytes(response_body.remaining())
    ));
    
    Ok(())
}

aggregate works the same for request and response bodies.

Error Handling

use hyper::body::aggregate;
use hyper::Error;
 
#[tokio::main]
async fn handle_errors() -> Result<(), Box<dyn std::error::Error>> {
    // aggregate can fail if:
    // - Body stream errors
    // - Network issues during collection
    // - Payload is too large (if limits configured)
    
    let body = hyper::Body::from("test");
    
    match aggregate(body).await {
        Ok(aggregated) => {
            println!("Successfully aggregated {} bytes", aggregated.remaining());
        }
        Err(e) => {
            // Handle specific errors
            if e.is::<Error>() {
                eprintln!("Hyper error: {}", e);
            }
            // Common errors:
            // - Disconnected during collection
            // - Timeout
            // - Payload limits exceeded
        }
    }
    
    Ok(())
}

aggregate returns errors from the underlying body stream.

Synthesis

Quick reference:

use hyper::body::aggregate;
use bytes::Buf;
 
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // aggregate: collect body chunks into single buffer
    let body = hyper::Body::from("Hello, World!");
    let aggregated = aggregate(body).await?;
    
    // Returns AggregatedBytes (impl Buf)
    // - May reference original chunk allocations
    // - May have multiple internal chunks
    // - Efficient for single-pass processing
    
    // Key methods on AggregatedBytes:
    // - .remaining() -> number of bytes
    // - .copy_to_bytes(n) -> contiguous Bytes
    // - .reader() -> std::io::Read
    // - .chunk() -> &Bytes (current chunk)
    
    // vs to_bytes:
    // - to_bytes: simpler, always returns Bytes
    // - aggregate: may avoid allocation, returns Buf
    
    // Use aggregate when:
    // 1. Processing body once
    // 2. Passing to Buf consumer
    // 3. Want to minimize allocations
    // 4. Memory efficiency matters
    
    // Don't use for:
    // 1. Very large bodies (stream instead)
    // 2. When you need Bytes immediately (use to_bytes)
    // 3. When body size exceeds memory
    
    // Memory behavior:
    // - Single chunk: references original (no copy)
    // - Multiple chunks: collects efficiently
    // - copy_to_bytes: always provides contiguous
    
    Ok(())
}

Key insight: hyper::body::aggregate is the bridge between streaming HTTP bodies and code that needs complete data. It efficiently collects chunks into a workable form without necessarily copying all bytes into a new allocation. The AggregatedBytes type implements Buf, enabling direct use with Buf consumers or conversion to Bytes when contiguous data is required. Use aggregate for small to medium bodies where processing the complete content is simpler than streaming—JSON parsing, form data handling, and configuration reading are common use cases. For large uploads or streaming processing, avoid aggregate and process chunks as they arrive.