What is the purpose of hyper::body::BodyExt::collect for aggregating streaming body data?
BodyExt::collect aggregates an asynchronous HTTP body stream into a contiguous buffer, converting a streaming body into collected bytes that can be processed as a single unit. This provides a convenient way to buffer entire request or response bodies when streaming processing isn't necessary, at the cost of holding all data in memory.
The BodyExt Trait
use hyper::body::{Body, Bytes, BodyExt};
// BodyExt is an extension trait for Body types
pub trait BodyExt: Body {
async fn collect(self) -> Result<Collected, Self::Error>;
// ... other methods
}
// Collected is a buffer of accumulated bytes
pub struct Collected {
// Internal representation of collected bytes
}BodyExt extends the Body trait with combinators, including collect for buffering.
Basic collect Usage
use hyper::body::BodyExt;
use hyper::Response;
use hyper::body::Incoming;
async fn basic_collect(response: Response<Incoming>) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
// Collect the streaming body into a buffer
let collected = response.collect().await?;
// Convert to bytes
let bytes = collected.to_bytes();
// Or convert to Vec<u8>
let vec = bytes.to_vec();
Ok(vec)
}collect awaits the entire body stream and aggregates all chunks into a buffer.
How collect Works Internally
use hyper::body::{Body, BodyExt, Bytes};
// Conceptual implementation of collect
async fn collect_implementation<B: Body>(body: B) -> Result<Collected, B::Error> {
let mut collected = Collected::new();
// Use the frame iterator to get data frames
mut while let Some(frame) = body.frame().await {
let frame = frame?;
if let Ok(data) = frame.into_data() {
collected.extend(&data);
}
// Trailers are handled separately
}
Ok(collected)
}
// Collected internally maintains a buffer of Bytes chunks
pub struct Collected {
chunks: Vec<Bytes>,
total_len: usize,
}collect streams all frames, accumulates data frames, and ignores trailers.
Collected Buffer Operations
use hyper::body::BodyExt;
use hyper::body::{Bytes, Collected};
async fn collected_operations() {
let collected: Collected = /* ... */;
// Convert to contiguous Bytes
let bytes: Bytes = collected.to_bytes();
// Iterate over chunks without copying
for chunk in collected.iter() {
println!("chunk: {:?}", chunk);
}
// Get total length
let len: usize = collected.len();
// Check if empty
let is_empty: bool = collected.is_empty();
}Collected provides methods to access aggregated data without unnecessary copies.
Converting to String
use hyper::body::BodyExt;
use hyper::Response;
use hyper::body::Incoming;
async fn body_to_string(response: Response<Incoming>) -> Result<String, Box<dyn std::error::Error>> {
let bytes = response.collect().await?.to_bytes();
// Convert bytes to string
let text = String::from_utf8(bytes.to_vec())?;
Ok(text)
}
// Alternative: use to_str on Bytes
async fn body_to_string_alt(response: Response<Incoming>) -> Result<String, Box<dyn std::error::Error>> {
let bytes = response.collect().await?.to_bytes();
// Bytes::to_str is faster when data is valid UTF-8
let text = std::str::from_utf8(&bytes)?.to_string();
Ok(text)
}collect followed by UTF-8 conversion is a common pattern for JSON and text bodies.
Memory Implications
use hyper::body::BodyExt;
async fn memory_example() {
// collect holds ALL body data in memory
// For small bodies: fine
// For large bodies: may exhaust memory
// Small body (safe)
let small: Bytes = small_request.collect().await?.to_bytes();
// Large body (potentially problematic)
let large: Bytes = large_video_request.collect().await?.to_bytes();
// All video data now in heap memory
// Alternative: use streaming for large bodies
while let Some(frame) = large_request.body_mut().frame().await {
// Process chunk by chunk
let chunk = frame?.into_data()?;
// Stream to disk, process incrementally
}
}collect buffers everything; use streaming for large bodies.
Comparison with Streaming Processing
use hyper::body::BodyExt;
use hyper::body::Incoming;
use tokio::io::AsyncWriteExt;
// Using collect: buffer entire body
async fn process_with_collect(
body: Incoming,
) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
let bytes = body.collect().await?.to_bytes();
Ok(bytes.to_vec())
}
// Using streaming: process incrementally
async fn process_streaming(
body: Incoming,
) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
use futures::StreamExt;
let mut output = Vec::new();
let mut stream = body;
while let Some(chunk) = stream.frame().await {
let data = chunk?.into_data()?;
output.extend_from_slice(&data);
}
Ok(output)
}
// Streaming to file: minimal memory
async fn stream_to_file(
body: Incoming,
mut file: tokio::fs::File,
) -> Result<(), Box<dyn std::error::Error>> {
use futures::StreamExt;
while let Some(chunk) = body.frame().await {
let data = chunk?.into_data()?;
file.write_all(&data).await?;
}
file.flush().await?;
Ok(())
}Streaming allows incremental processing; collect requires buffering everything.
JSON Body Processing
use hyper::body::BodyExt;
use hyper::Response;
use hyper::body::Incoming;
use serde::Deserialize;
#[derive(Deserialize)]
struct User {
name: String,
email: String,
}
async fn parse_json_body(
response: Response<Incoming>,
) -> Result<User, Box<dyn std::error::Error>> {
// Collect entire body first
let bytes = response.collect().await?.to_bytes();
// Parse JSON from collected bytes
let user: User = serde_json::from_slice(&bytes)?;
Ok(user)
}
// For large JSON: consider streaming deserializer
async fn parse_large_json(
response: Response<Incoming>,
) -> Result<User, Box<dyn std::error::Error>> {
// For small to medium JSON, collect is fine
// For very large JSON, use serde_json::from_reader with async stream
// Requires external crates like serde_json_path or custom implementation
let bytes = response.collect().await?.to_bytes();
let user: User = serde_json::from_slice(&bytes)?;
Ok(user)
}JSON parsing typically requires the full body; collect is appropriate here.
Handling Trailers
use hyper::body::{BodyExt, Frame};
use hyper::Response;
use hyper::body::Incoming;
async fn handle_with_trailers(
response: Response<Incoming>,
) -> Result<(), Box<dyn std::error::Error>> {
// collect() ignores trailers
let collected = response.collect().await?;
let bytes = collected.to_bytes();
// If you need trailers, process frames manually:
let (parts, body) = response.into_parts();
let mut body = body;
let mut data = Vec::new();
let mut trailers = None;
while let Some(frame) = body.frame().await {
let frame = frame?;
match frame {
Frame::Data(bytes) => data.extend_from_slice(&bytes),
Frame::Trailers(t) => trailers = Some(t),
_ => {}
}
}
// Now you have both data and trailers
Ok(())
}collect captures data frames but discards trailers; use frame iteration for trailers.
Integration with Response Types
use hyper::body::BodyExt;
use hyper::{Request, Response, body::Incoming};
async fn handle_request(
req: Request<Incoming>,
) -> Result<Response<String>, Box<dyn std::error::Error>> {
// Collect request body
let body_bytes = req.collect().await?.to_bytes();
let body_str = String::from_utf8(body_bytes.to_vec())?;
// Process the collected body
let response_body = format!("Received: {}", body_str);
Ok(Response::new(response_body))
}
// Multiple body types support collect
async fn generic_body_handling<B>(body: B) -> Result<Bytes, B::Error>
where
B: BodyExt + Body,
{
body.collect().await.map(|c| c.to_bytes())
}collect works with any body type implementing BodyExt.
Timeout Handling with collect
use hyper::body::BodyExt;
use hyper::body::Incoming;
use tokio::time::{timeout, Duration};
async fn collect_with_timeout(
body: Incoming,
duration: Duration,
) -> Result<hyper::body::Bytes, Box<dyn std::error::Error>> {
// Add timeout to prevent hanging on slow/infinite streams
let result = timeout(duration, body.collect()).await??;
Ok(result.to_bytes())
}
async fn collect_with_timeout_alt(
body: Incoming,
) -> Result<hyper::body::Bytes, Box<dyn std::error::Error>> {
// Alternative: use tokio::select for cancellation
let collect_future = body.collect();
tokio::select! {
result = collect_future => {
Ok(result?.to_bytes())
}
_ = tokio::time::sleep(Duration::from_secs(30)) => {
Err("timeout".into())
}
}
}Add timeouts to collect to prevent indefinite blocking on slow streams.
Error Handling Patterns
use hyper::body::BodyExt;
use hyper::body::Incoming;
use hyper::Error;
async fn error_handling(body: Incoming) -> Result<Vec<u8>, String> {
// collect returns Result<Collected, Body::Error>
let collected = body.collect().await
.map_err(|e| format!("Failed to collect body: {}", e))?;
// to_bytes doesn't fail
let bytes = collected.to_bytes();
// Convert to vec
Ok(bytes.to_vec())
}
// Handling different error types
async fn handle_body_error(body: Incoming) -> Result<hyper::body::Bytes, hyper::Error> {
match body.collect().await {
Ok(collected) => Ok(collected.to_bytes()),
Err(e) => {
// Body errors: IO errors, protocol errors, etc.
eprintln!("Body collection failed: {}", e);
Err(e)
}
}
}collect returns a Result with the body's error type.
Aggregate Body Size Limiting
use hyper::body::BodyExt;
use hyper::body::Incoming;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::Stream;
// Custom body wrapper that limits size
struct LimitedBody<B> {
inner: B,
limit: usize,
current: usize,
}
impl<B> BodyExt for LimitedBody<B> where B: BodyExt + Body {}
impl<B: Body> Body for LimitedBody<B> {
type Data = B::Data;
type Error = B::Error;
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
// Implement size checking during frame polling
// This is conceptual; full implementation is more complex
self.inner.poll_frame(cx)
}
}
// Alternative: check size after collection
async fn collect_with_limit(
body: Incoming,
max_size: usize,
) -> Result<Vec<u8>, String> {
let collected = body.collect().await
.map_err(|e| e.to_string())?;
if collected.len() > max_size {
return Err(format!("Body too large: {} > {}", collected.len(), max_size));
}
Ok(collected.to_bytes().to_vec())
}Apply size limits either during streaming or after collection.
Performance Characteristics
use hyper::body::BodyExt;
use hyper::body::Incoming;
async fn performance() {
// collect() characteristics:
// 1. Single allocation for contiguous buffer
// - to_bytes() may coalesce chunks into one allocation
// 2. Minimal copies with Bytes
// - Bytes uses reference counting
// - Cloning is cheap
// 3. Memory usage
// - Proportional to body size
// - No upper bound without limits
// 4. Time complexity
// - O(n) for n bytes in body
// - Each frame is processed once
// 5. Async behavior
// - Yields between frames
// - Non-blocking for network I/O
let body: Incoming = /* ... */;
let bytes = body.collect().await?.to_bytes();
// bytes is contiguous, efficient for parsing
// But holds all memory until dropped
}collect is efficient for moderate-sized bodies; streaming is better for large data.
Using with hyper::Body (deprecated but common)
use hyper::body::BodyExt;
use hyper::Body;
// Note: hyper::Body is deprecated in favor of incoming::Incoming
// But the pattern is the same
async fn legacy_body_example(body: Body) -> Result<Vec<u8>, hyper::Error> {
let bytes = hyper::body::to_bytes(body).await?;
// Or with BodyExt:
// let bytes = body.collect().await?.to_bytes();
Ok(bytes.to_vec())
}
// Modern approach with Incoming
async fn modern_body_example(body: hyper::body::Incoming) -> Result<Vec<u8>, hyper::Error> {
let bytes = body.collect().await?.to_bytes();
Ok(bytes.to_vec())
}Both to_bytes and collect aggregate bodies; collect is more flexible.
Comparison Table
fn comparison() {
// βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
// β Method β Use Case β Memory β Copy β
// βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
// β collect().to_bytes()β Need all data at onceβ O(body size) β Minimal β
// β frame().await β Stream processing β O(chunk size) β None β
// β to_bytes() β Legacy, all data β O(body size) β Varies β
// β collect().iter() β Access chunks β O(body size) β None β
// βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
}Complete Example: HTTP Client
use hyper::body::BodyExt;
use hyper::{body::Incoming, Response};
use serde::Deserialize;
#[derive(Debug, Deserialize)]
struct ApiResponse {
status: String,
data: Vec<String>,
}
async fn fetch_and_parse(
response: Response<Incoming>,
) -> Result<ApiResponse, Box<dyn std::error::Error>> {
// 1. Collect the streaming body
let collected = response.collect().await?;
// 2. Convert to bytes
let bytes = collected.to_bytes();
// 3. Parse JSON
let api_response: ApiResponse = serde_json::from_slice(&bytes)?;
Ok(api_response)
}
// With error handling and size limit
async fn fetch_and_parse_safe(
response: Response<Incoming>,
max_size: usize,
) -> Result<ApiResponse, Box<dyn std::error::Error>> {
// Collect with size awareness
let collected = response.collect().await?;
// Check size
if collected.len() > max_size {
return Err(format!("Response too large: {} bytes", collected.len()).into());
}
let bytes = collected.to_bytes();
let api_response: ApiResponse = serde_json::from_slice(&bytes)
.map_err(|e| format!("JSON parse error: {}", e))?;
Ok(api_response)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Simulated response handling
println!("Use collect() when you need the entire body at once.");
println!("Use frame iteration for streaming large bodies.");
Ok(())
}When to Use collect vs. Streaming
use hyper::body::BodyExt;
use hyper::body::Incoming;
async fn decision_guide(body: Incoming) -> Result<(), Box<dyn std::error::Error>> {
// USE COLLECT WHEN:
// - Body is small to medium size
// - You need all data at once (JSON parsing, string operations)
// - Simplicity is more important than memory efficiency
// - You need random access to the data
// USE STREAMING WHEN:
// - Body is large (files, video, large JSON)
// - Memory is constrained
// - You can process incrementally
// - You need to stream to disk or another destination
// - You need trailers
// USE collect WITH LIMITS WHEN:
// - Body size is unknown but potentially large
// - You want to enforce maximum size
// - You still need all data at once
// Examples:
// 1. API response JSON: collect is fine
let json_bytes = body.collect().await?.to_bytes();
// 2. File upload: stream to disk
// while let Some(frame) = body.frame().await { /* write to file */ }
// 3. Unknown size: collect with limit
// if body.collect().await?.len() > MAX { return Err(...); }
Ok(())
}Summary
fn summary() {
// βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
// β Aspect β Behavior β
// βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
// β Purpose β Aggregate streaming body into buffer β
// β Output β Collected type with to_bytes(), iter() β
// β Memory usage β Proportional to body size β
// β Trailers β Discarded (use frame() to capture) β
// β Error handling β Returns Result with body's Error type β
// β Performance β O(n) time, O(n) memory β
// β Best for β Small/medium bodies, JSON, random access β
// β Avoid for β Large files, video, memory-constrained β
// βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
// Key points:
// 1. collect() buffers the entire body into memory
// 2. Returns Collected with to_bytes() for contiguous access
// 3. Simpler than manual frame iteration
// 4. Holds all data in memory - not for large bodies
// 5. Trailers are discarded during collection
// 6. Works with any body type implementing BodyExt
// 7. Returns Result, propagating body errors
// 8. Use with timeouts for untrusted sources
// 9. to_bytes() may coalesce chunks into one allocation
// 10. Use streaming for large bodies or limited memory
}Key insight: BodyExt::collect bridges the gap between HTTP's streaming model and applications that need complete body data. HTTP bodies arrive as a stream of frames (data chunks and optional trailers), but many operationsβJSON parsing, string processing, cryptographic hashingβrequire the complete body. collect accumulates all data frames into a Collected buffer, which can then be converted to contiguous Bytes or iterated chunk-by-chunk. The trade-off is memory: collect holds the entire body in heap memory, making it unsuitable for large files or memory-constrained environments. For streaming scenariosβwriting uploads to disk, processing video, handling unlimited-size requestsβuse frame() iteration instead. Use collect when you genuinely need all data at once and the body size is bounded; for everything else, stream. Always consider adding size limits or timeouts when collecting untrusted bodies to prevent resource exhaustion.
