Loading pageâŚ
Rust walkthroughs
Loading pageâŚ
hyper::body::aggregate combine multiple data frames into a single contiguous buffer?hyper::body::aggregate consumes a stream of data frames and concatenates them into a single Buf implementation, avoiding intermediate Vec<u8> allocations while providing a contiguous view over all the data. HTTP bodies in hyper arrive as streams of Frame<Data> chunksâeach potentially backed by separate allocations. The aggregate function returns a Buf that presents these chunks as a single logical buffer through cursor-based traversal, internally maintaining references to each frame's data. This enables efficient parsing and processing of complete request bodies without copying data into a single contiguous allocation, which would waste memory and CPU on large payloads.
use hyper::body::Body;
use http_body::Body as HttpBody;
use bytes::Bytes;
async fn example() {
// HTTP bodies in hyper are streams of frames
let body: Body = Body::from("Hello, World!");
// A body is a stream of Frame<Data> items
// Each frame contains a chunk of data
// Frames may come from different allocations
// Without aggregate, you process chunks individually:
let mut stream = body.into_data_stream();
while let Some(chunk) = stream.next().await {
let chunk: Bytes = chunk.unwrap();
// Each chunk is separate
// Processing must handle discontinuity
}
}HTTP bodies arrive as streams of frames, each potentially backed by separate memory.
use hyper::body::{aggregate, Body};
use bytes::Buf;
// The aggregate function signature:
// pub async fn aggregate<T>(body: T) -> Result<impl Buf, T::Error>
// where
// T: HttpBody,
async fn example(body: Body) -> Result<(), Box<dyn std::error::Error>> {
// aggregate consumes the body and returns a Buf
let buf = aggregate(body).await?;
// The returned Buf presents all frames as one logical buffer
// No copying occurs - it references original allocations
Ok(())
}aggregate takes a body and returns a Buf implementation that logically concatenates all frames.
use hyper::body::{aggregate, Body};
use bytes::Buf;
async fn read_body(body: Body) -> Result<String, hyper::Error> {
// Combine all frames into a single Buf
let buf = aggregate(body).await?;
// The Buf can be read contiguously
let mut reader = buf.reader();
let mut contents = String::new();
std::io::Read::read_to_string(&mut reader, &mut contents).unwrap();
Ok(contents)
}
// Alternatively, using Buf methods directly:
async fn read_body_buf(body: Body) -> Result<Vec<u8>, hyper::Error> {
let mut buf = aggregate(body).await?;
// Collect all bytes
let mut vec = Vec::with_capacity(buf.remaining());
while buf.has_remaining() {
// Read chunk by chunk without copying to intermediate buffer
let chunk = buf.chunk();
vec.extend_from_slice(chunk);
buf.advance(chunk.len());
}
Ok(vec)
}The returned Buf allows reading all data as if it were a single contiguous buffer.
use hyper::body::Body;
use bytes::{Buf, Bytes, BytesMut};
async fn buf_operations(body: Body) -> Result<(), hyper::Error> {
let mut buf = aggregate(body).await?;
// Buf provides cursor-based reading
println!("Remaining bytes: {}", buf.remaining());
// Get a slice of contiguous bytes at current position
// This may not be ALL bytes if there are multiple frames
let chunk: &[u8] = buf.chunk();
println!("First chunk size: {}", chunk.len());
// Advance the cursor
buf.advance(chunk.len());
// Continue reading
if buf.has_remaining() {
let next_chunk = buf.chunk();
// This accesses the next frame's data
}
Ok(())
}
// The chunk() method returns the NEXT contiguous slice
// For multi-frame buffers, you iterate through chunksThe Buf trait provides cursor-based traversal, returning contiguous slices one at a time.
use hyper::body::{aggregate, Body};
use bytes::Buf;
// WITHOUT aggregate: Manual collection with allocation
async fn without_aggregate(body: Body) -> Result<Vec<u8>, hyper::Error> {
use futures::StreamExt;
let mut stream = body.into_data_stream();
let mut all_bytes = Vec::new();
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
all_bytes.extend_from_slice(&chunk);
}
// Problem: If body has 10 chunks, we:
// 1. Allocate each chunk
// 2. Allocate a Vec to hold all
// 3. Copy each chunk into the Vec
Ok(all_bytes)
}
// WITH aggregate: Zero-copy concatenation
async fn with_aggregate(body: Body) -> Result<impl Buf, hyper::Error> {
let buf = aggregate(body).await?;
// The buf references original chunk allocations
// No intermediate Vec<u8> needed
// No copying between buffers
// You can still get a Vec if needed:
// let vec = buf.to_bytes(); // This DOES copy
// But you have the choice
Ok(buf)
}aggregate avoids the overhead of collecting chunks into a new Vec<u8> allocation.
use hyper::body::Body;
use bytes::{Buf, Bytes};
// The aggregated buffer internally holds multiple Bytes references:
async fn inspect_aggregate(body: Body) -> Result<(), hyper::Error> {
let buf = aggregate(body).await?;
// The Buf implementation internally:
// - Stores references to all original frame allocations
// - Maintains a cursor pointing to current position
// - Returns contiguous slices from one frame at a time
// This is similar to bytes::Chain but optimized for HTTP frames
// It chains multiple Bytes together logically
Ok(())
}
// Conceptually, the aggregated buffer looks like:
// Frame 1: [0..100]
// Frame 2: [0..50]
// Frame 3: [0..200]
//
// Aggregate presents: [Frame 1 bytes][Frame 2 bytes][Frame 3 bytes]
// As one logical buffer with total size 350The aggregated buffer stores references to original frame data, presenting them as one logical sequence.
use hyper::body::{aggregate, Body};
use bytes::Buf;
async fn copy_to_single_bytes(body: Body) -> Result<bytes::Bytes, hyper::Error> {
let mut buf = aggregate(body).await?;
// copy_to_bytes creates a NEW contiguous Bytes
// This DOES allocate and copy if there are multiple frames
let total_size = buf.remaining();
let contiguous: bytes::Bytes = buf.copy_to_bytes(total_size);
// Now contiguous is a single allocation
// Trade-off: you pay for the copy to get true contiguity
Ok(contiguous)
}
// Use when you NEED contiguous bytes (e.g., for APIs requiring &[u8])
// Avoid when you can work with Buf's chunked interfacecopy_to_bytes allocates a single contiguous buffer when true contiguity is required.
use hyper::body::{aggregate, Body};
use bytes::Buf;
// Parse a simple binary protocol with fixed-size header
struct PacketHeader {
version: u8,
flags: u8,
length: u16,
sequence: u32,
}
async fn parse_packet(body: Body) -> Result<PacketHeader, hyper::Error> {
let mut buf = aggregate(body).await?;
// Need exactly 8 bytes for header
if buf.remaining() < 8 {
return Err(hyper::Error::new(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"insufficient data for header",
)));
}
// Buf provides methods for reading primitives
// These handle endianness automatically
let version = buf.get_u8();
let flags = buf.get_u8();
let length = buf.get_u16(); // Big-endian by default
let sequence = buf.get_u32();
Ok(PacketHeader {
version,
flags,
length,
sequence,
})
}
// Buf's get_* methods work across frame boundaries
// They advance the cursor and return valuesThe Buf trait's reading methods handle multi-frame buffers transparently.
use hyper::body::{aggregate, Body};
use bytes::Buf;
async fn process_streaming(body: Body) -> Result<(), hyper::Error> {
let mut buf = aggregate(body).await?;
// Process data in chunks without requiring full contiguity
while buf.has_remaining() {
// Get the current contiguous chunk
let chunk = buf.chunk();
// Process this chunk
process_chunk(chunk);
// Advance past processed data
buf.advance(chunk.len());
}
Ok(())
}
fn process_chunk(data: &[u8]) {
// Process a contiguous slice
// May be called multiple times for one body
println!("Processing {} bytes", data.len());
}
// This avoids the copy_to_bytes allocation entirely
// Works with data as it arrives in framesFor many use cases, processing chunks sequentially is more efficient than forcing contiguity.
use hyper::body::{aggregate, Body};
use bytes::Buf;
async fn to_bytes_example(body: Body) -> Result<Vec<u8>, hyper::Error> {
let buf = aggregate(body).await?;
// to_bytes() is shorthand for copying all remaining data
// This allocates a new Vec<u8> and copies
let bytes = buf.copy_to_bytes(buf.remaining());
// Or use Bytes::copy_from_slice pattern:
// let bytes = bytes::Bytes::copy_from_slice(buf.chunk());
// (Only copies first chunk!)
Ok(bytes.to_vec())
}
// Alternative: Buf::to_bytes() method
async fn to_bytes_method(body: Body) -> Result<bytes::Bytes, hyper::Error> {
let buf = aggregate(body).await?;
// Buf trait provides to_bytes() for convenience
// Internally uses copy_to_bytes
// Note: this copies to a new allocation
Ok(buf.copy_to_bytes(buf.remaining()))
}When you need owned bytes, copy_to_bytes creates a contiguous allocation from all frames.
use hyper::body::{aggregate, Body, Bytes};
use bytes::Buf;
use futures::StreamExt;
async fn compare_approaches(body: Body) {
// Approach 1: Manual collection
let stream = body.into_data_stream();
let chunks: Vec<Bytes> = stream.collect().await;
// Result: Vec<Bytes> - still multiple allocations
// Need to iterate through chunks
// Approach 2: Manual concatenation into Vec
let stream = body.into_data_stream();
let mut all = Vec::new();
futures::pin_mut!(stream);
while let Some(chunk) = stream.next().await {
let chunk = chunk.unwrap();
all.extend_from_slice(&chunk);
}
// Result: Vec<u8> - single allocation with copies
// Approach 3: aggregate
let body: Body = todo!();
let buf = aggregate(body).await.unwrap();
// Result: impl Buf - logical concatenation, no copies
// Can still get contiguous bytes with copy_to_bytes
}aggregate provides a middle ground: logical contiguity without mandatory copying.
use hyper::body::{aggregate, Body};
use bytes::Buf;
use tokio::io::{AsyncRead, AsyncReadExt};
async fn read_via_async_read(body: Body) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
let buf = aggregate(body).await?;
// Buf::reader() provides an AsyncRead implementation
let mut reader = buf.reader();
// Can use AsyncReadExt methods
let mut contents = vec
![0u8; 1024];
let mut total = Vec::new();
loop {
let n = reader.read(&mut contents).await?;
if n == 0 {
break;
}
total.extend_from_slice(&contents[..n]);
}
Ok(total)
}
// reader() creates a bridge between Buf and AsyncRead traits
// Useful for interfacing with code expecting AsyncReadThe Buf::reader() method adapts the aggregated buffer to AsyncRead for compatibility.
use hyper::body::{aggregate, Body};
use bytes::Buf;
// For large bodies, aggregate holds all data in memory
// This is a trade-off vs streaming processing
async fn handle_large_body(body: Body) -> Result<(), hyper::Error> {
let buf = aggregate(body).await?;
// All frames are now held in memory
// If body is 1GB, 1GB is in memory
// For large bodies, consider streaming instead:
// - Process chunks as they arrive
// - Don't hold entire body in memory
// But aggregate is fine for:
// - Small-to-medium bodies
// - When you need complete body for parsing
// - When body must be processed in entirety
Ok(())
}
// Alternative for large bodies: streaming
async fn handle_streaming(body: Body) -> Result<(), hyper::Error> {
use futures::StreamExt;
let mut stream = body.into_data_stream();
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
// Process chunk immediately
// Don't accumulate in memory
}
Ok(())
}aggregate materializes the entire body in memory; streaming is better for large payloads.
use hyper::body::{aggregate, Body};
use serde::de::DeserializeOwned;
async fn parse_json<T: DeserializeOwned>(body: Body) -> Result<T, Box<dyn std::error::Error>> {
let buf = aggregate(body).await?;
// serde_json can read from a Buf implementation
let result: T = serde_json::from_reader(buf.reader())?;
// This avoids intermediate String/Vec allocations
// The Buf presents data directly to the JSON parser
Ok(result)
}
// The JSON parser reads through the Buf
// If body has multiple frames, they're presented logically contiguous
// The parser doesn't need to know about frame boundariesaggregate enables direct parsing without intermediate String or Vec<u8> allocations.
use hyper::body::{aggregate, Body};
use bytes::Buf;
#[derive(Debug)]
struct Message {
header: Header,
payload: Vec<u8>,
}
#[derive(Debug)]
struct Header {
msg_type: u8,
flags: u8,
payload_len: u32,
}
async fn parse_message(body: Body) -> Result<Message, Box<dyn std::error::Error>> {
let mut buf = aggregate(body).await?;
// Parse header - Buf methods handle cross-frame reads
let msg_type = buf.get_u8();
let flags = buf.get_u8();
let payload_len = buf.get_u32();
let header = Header {
msg_type,
flags,
payload_len,
};
// Verify we have enough data
if buf.remaining() < payload_len as usize {
return Err("insufficient payload".into());
}
// Extract payload
let payload = buf.copy_to_bytes(payload_len as usize).to_vec();
Ok(Message { header, payload })
}
// Buf's get_* methods seamlessly read across frame boundaries
// If header spans two frames, get_u32 still returns correct valueBinary protocols benefit from Buf's ability to read across frame boundaries transparently.
use bytes::{Buf, Bytes};
// Simplified view of how aggregate works internally:
struct AggregatedBuf {
chunks: Vec<Bytes>,
current_chunk: usize,
position_in_chunk: usize,
}
impl Buf for AggregatedBuf {
fn remaining(&self) -> usize {
let mut remaining = 0;
for chunk in &self.chunks[self.current_chunk..] {
remaining += chunk.len();
}
remaining - self.position_in_chunk
}
fn chunk(&self) -> &[u8] {
if self.current_chunk >= self.chunks.len() {
return &[];
}
let chunk = &self.chunks[self.current_chunk];
&chunk[self.position_in_chunk..]
}
fn advance(&mut self, mut cnt: usize) {
while cnt > 0 && self.current_chunk < self.chunks.len() {
let remaining_in_chunk = self.chunks[self.current_chunk].len() - self.position_in_chunk;
if cnt < remaining_in_chunk {
self.position_in_chunk += cnt;
cnt = 0;
} else {
cnt -= remaining_in_chunk;
self.current_chunk += 1;
self.position_in_chunk = 0;
}
}
}
fn has_remaining(&self) -> bool {
self.remaining() > 0
}
}
// The actual implementation is more optimized
// Uses bytes::Chain internally for efficiencyThe aggregated buffer maintains references to all chunks and presents them logically as one.
Core mechanism:
aggregate consumes a body's data streamBuf that chains all frames logicallychunk() iterationBuf trait methodsKey operations:
// Get logical buffer
let buf = aggregate(body).await?;
// Read primitives across frame boundaries
let value: u32 = buf.get_u32();
// Process chunks without copying
while buf.has_remaining() {
let chunk = buf.chunk();
process(chunk);
buf.advance(chunk.len());
}
// Force contiguity when needed
let bytes: Bytes = buf.copy_to_bytes(buf.remaining());
// Bridge to AsyncRead
let reader = buf.reader();Trade-offs:
| Approach | Memory | Contiguity | Copy |
|----------|--------|------------|------|
| aggregate | Holds all frames | Logical | No |
| copy_to_bytes | New allocation | Physical | Yes |
| Manual collect | Vec + frames | Neither | Yes |
| Streaming | Ongoing only | No | No |
When to use aggregate:
Buf trait methods (get_u8, get_u32, etc.)When to avoid:
Key insight: hyper::body::aggregate bridges the gap between streaming (memory-efficient but discontinuous) and buffering (contiguous but memory-intensive). It provides logical contiguityâthe ability to read data sequentially with Buf methodsâwithout requiring a single contiguous allocation. The Buf trait's cursor-based interface handles frame boundaries internally, so code reads primitives seamlessly across chunk boundaries. When true contiguity is needed, copy_to_bytes allocates and copies. This design lets you choose: process data in frames (no allocation), use the Buf interface (logical contiguity), or copy to bytes (physical contiguity), depending on your requirements.