Loading page…
Rust walkthroughs
Loading page…
hyper::Body implement Stream and what are the implications for request/response handling?hyper::Body implements the Stream trait from the futures crate, allowing it to yield chunks of data asynchronously rather than loading entire request or response bodies into memory. This streaming design enables efficient handling of large payloads, real-time data, and memory-constrained environments. The body yields Bytes chunks as they arrive from the network, and the Stream implementation allows composing body processing with other async operations.
use hyper::Body;
use futures::StreamExt;
async fn stream_body() {
let body = Body::from("Hello, World!");
// Body implements Stream<Item = Result<Bytes, Error>>
let mut stream = body;
while let Some(chunk) = stream.next().await {
match chunk {
Ok(bytes) => {
println!("Received {} bytes", bytes.len());
println!("Content: {:?}", bytes);
}
Err(e) => {
eprintln!("Error: {}", e);
}
}
}
}The body yields Result<Bytes, hyper::Error> items, where each Bytes is a reference-counted slice of data.
use futures::stream::Stream;
use hyper::Body;
// Conceptual implementation:
// impl Stream for Body {
// type Item = Result<Bytes, hyper::Error>;
//
// fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
// -> Poll<Option<Self::Item>>
// {
// // Polls for the next chunk of data from the network
// // Returns:
// // - Poll::Ready(Some(Ok(chunk))) - received a chunk
// // - Poll::Ready(Some(Err(e))) - error occurred
// // - Poll::Ready(None) - stream ended
// // - Poll::Pending - waiting for data
// }
// }
async fn process_stream() {
let body = Body::from("Hello, World!");
// Using Stream trait methods
use futures::StreamExt;
// Collect all chunks into one buffer
let bytes = hyper::body::to_bytes(body).await.unwrap();
println!("Total: {} bytes", bytes.len());
}The poll_next method drives the async I/O that retrieves body data.
use hyper::{Body, Client, Request};
use hyper::body::Bytes;
use futures::StreamExt;
async fn fetch_large_file() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::new();
let uri = "http://example.com/large-file.zip".parse()?;
let resp = client.get(uri).await?;
// Stream the response body chunk by chunk
let mut stream = resp.into_body();
let mut total_bytes = 0;
while let Some(chunk) = stream.next().await {
let chunk: Bytes = chunk?;
total_bytes += chunk.len();
// Process chunk without loading entire file
process_chunk(&chunk);
println!("Progress: {} bytes", total_bytes);
}
println!("Complete: {} bytes", total_bytes);
Ok(())
}
fn process_chunk(chunk: &[u8]) {
// Handle each chunk as it arrives
// Could write to file, parse incrementally, etc.
}Streaming enables processing files larger than available memory.
use hyper::Body;
use futures::StreamExt;
async fn parse_json_stream() {
// Server-sent events or NDJSON stream
let body = Body::from(
r#"{"event":"start","id":1}
{"event":"data","id":2}
{"event":"data","id":3}
{"event":"end","id":4}
"#
);
let mut stream = body;
let mut buffer = Vec::new();
while let Some(chunk) = stream.next().await {
let chunk = chunk.unwrap();
buffer.extend_from_slice(&chunk);
// Try to parse complete lines
while let Some(newline_pos) = buffer.iter().position(|&b| b == b'\n') {
let line: Vec<u8> = buffer.drain(..=newline_pos).collect();
let line_str = String::from_utf8_lossy(&line);
if !line_str.trim().is_empty() {
// Parse each JSON object as it arrives
if let Ok(event) = serde_json::from_str::<Event>(&line_str) {
println!("Event: {:?}", event);
}
}
}
}
}
#[derive(serde::Deserialize, Debug)]
struct Event {
event: String,
id: u32,
}Streaming parsing allows handling JSON lines as they arrive.
use hyper::Body;
use futures::StreamExt;
async fn backpressure_example() {
let body = Body::from(vec![0u8; 1_000_000]); // 1MB body
let mut stream = body;
let mut processed = 0;
// Stream automatically provides backpressure
// Processing each chunk before requesting the next
while let Some(chunk) = stream.next().await {
let chunk = chunk.unwrap();
// Simulate slow processing
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
processed += chunk.len();
// The network stack won't send more data until we poll again
// This is automatic backpressure
}
}The consumer controls the pace by only polling when ready.
use hyper::Body;
use futures::stream;
use bytes::Bytes;
async fn create_streaming_body() {
// Create body from a stream of chunks
let chunks = vec![
Ok(Bytes::from("First chunk\n")),
Ok(Bytes::from("Second chunk\n")),
Ok(Bytes::from("Third chunk\n")),
];
let stream = stream::iter(chunks);
let body = Body::wrap_stream(stream);
// This body can be used in a request or response
// Each chunk will be sent as it becomes available
}
// Stream from async source
async fn create_async_streaming_body() {
let stream = async_stream::stream! {
for i in 0..5 {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
yield Ok(Bytes::from(format!("Chunk {}\n", i)));
}
};
let body = Body::wrap_stream(stream);
// Body yields chunks with delays between them
}Body::wrap_stream converts any stream into a body.
use hyper::{Body, Client, Request, Method};
use futures::stream;
use bytes::Bytes;
async fn streaming_upload() -> Result<(), Box<dyn std::error::Error>> {
// Create a streaming request body
let chunks = vec![
Bytes::from("Part 1 of data\n"),
Bytes::from("Part 2 of data\n"),
Bytes::from("Part 3 of data\n"),
];
let stream = stream::iter(chunks.into_iter().map(Ok::<_, hyper::Error>));
let body = Body::wrap_stream(stream);
// Build request with streaming body
let request = Request::builder()
.method(Method::POST)
.uri("http://example.com/upload")
.header("Content-Type", "application/octet-stream")
.body(body)?;
let client = Client::new();
let response = client.request(request).await?;
println!("Upload status: {}", response.status());
Ok(())
}Streaming uploads avoid loading entire files into memory.
use hyper::{Body, Client, Request, Method};
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use futures::stream::StreamExt;
async fn upload_file(path: &str) -> Result<(), Box<dyn std::error::Error>> {
// Stream file contents as body
let file = File::open(path).await?;
// Convert file to a stream using tokio_util
use tokio_util::io::ReaderStream;
let reader_stream = ReaderStream::new(file);
let body = Body::wrap_stream(reader_stream);
let request = Request::builder()
.method(Method::POST)
.uri("http://example.com/upload")
.body(body)?;
let client = Client::new();
let response = client.request(request).await?;
println!("Upload status: {}", response.status());
Ok(())
}Files stream directly to the network without buffering in memory.
use hyper::{Body, Request, Response, Server};
use hyper::service::{make_service_fn, service_fn};
use futures::stream;
use bytes::Bytes;
async fn streaming_response() -> Result<(), Box<dyn std::error::Error>> {
// Create a streaming response
let stream = async_stream::stream! {
for i in 0..10 {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
yield Ok(Bytes::from(format!("Event {}: Data\n", i)));
}
};
let body = Body::wrap_stream(stream);
let response = Response::builder()
.status(200)
.header("Content-Type", "text/event-stream")
.body(body)?;
// This response streams events to the client
Ok(())
}Server-sent events use streaming bodies naturally.
use hyper::{Body, body};
use futures::StreamExt;
async fn body_utilities() {
let body = Body::from("Hello, World!");
// Collect entire body into Bytes
let bytes = body::to_bytes(body).await.unwrap();
println!("Bytes: {:?}", bytes);
// Collect into Vec
let body = Body::from("Hello, World!");
let vec = body::to_bytes(body).await.unwrap().to_vec();
println!("Vec: {:?}", vec);
// Aggregate body
let body = Body::from("Hello, ");
let body2 = Body::from("World!");
let combined = body::aggregate(body).await.unwrap();
// combined implements Buf
}
// Streaming with limit
async fn limited_streaming() {
let body = Body::from(vec![0u8; 100]);
// Body::limit prevents memory exhaustion
let mut body = body;
// Process chunks with size awareness
let mut total = 0;
while let Some(chunk) = body.next().await {
let chunk = chunk.unwrap();
total += chunk.len();
if total > 50 {
println!("Warning: Large body");
break;
}
}
}Hyper provides utilities for common body operations.
use hyper::Body;
use tokio::sync::mpsc;
async fn channel_body() {
let (tx, rx) = mpsc::channel::<Result<bytes::Bytes, hyper::Error>>(32);
// Create body from receiver
let body = Body::wrap_stream(rx);
// Send chunks from another task
tokio::spawn(async move {
for i in 0..5 {
let chunk = Bytes::from(format!("Chunk {}\n", i));
tx.send(Ok(chunk)).await.unwrap();
}
// Channel closes when tx is dropped
});
// Process body
use futures::StreamExt;
let mut stream = body;
while let Some(chunk) = stream.next().await {
println!("Got: {:?}", chunk.unwrap());
}
}Channels allow dynamic body generation from multiple sources.
use hyper::Body;
use futures::StreamExt;
async fn handle_body_errors() {
let body = Body::from("data");
let mut stream = body;
while let Some(result) = stream.next().await {
match result {
Ok(chunk) => {
// Process successful chunk
println!("Got {} bytes", chunk.len());
}
Err(e) => {
// Handle error - could be:
// - Network error
// - Timeout
// - Protocol error
eprintln!("Body error: {}", e);
// Stream ends after error (returns None next)
break;
}
}
}
}
// Propagate errors in streaming
async fn propagate_errors() -> Result<Vec<u8>, hyper::Error> {
let body = Body::from("data");
let mut stream = body;
let mut all_bytes = Vec::new();
while let Some(chunk) = stream.next().await {
// Error is automatically propagated with ?
all_bytes.extend_from_slice(&chunk?);
}
Ok(all_bytes)
}Errors in streaming bodies propagate through the Result.
use hyper::Body;
use futures::StreamExt;
async fn compose_streams() {
let body = Body::from(vec![
b'H', b'e', b'l', b'l', b'o'
]);
// Map chunks
let mapped = body.map(|result| {
result.map(|bytes| {
// Transform each chunk
bytes.map(|byte| byte.to_ascii_uppercase())
})
});
// Collect transformed body
let result = hyper::body::to_bytes(Body::wrap_stream(mapped)).await.unwrap();
println!("Uppercase: {:?}", result); // "HELLO"
// Filter chunks
let body = Body::from("Hello\nWorld\nTest");
let filtered = body.filter(|result| {
async move {
// Only keep chunks containing specific content
if let Ok(bytes) = result {
Some(Ok(bytes))
} else {
None
}
}
});
}Stream combinators enable transformations on body data.
use hyper::Body;
use futures::StreamExt;
async fn memory_efficient_comparison() {
// BAD: Load entire body into memory
let body = Body::from(vec![0u8; 100_000_000]); // 100MB
let all_bytes = hyper::body::to_bytes(body).await.unwrap();
// Now 100MB is in memory!
// GOOD: Stream and process incrementally
let body = Body::from(vec![0u8; 100_000_000]);
let mut stream = body;
// Process in 8KB chunks
let mut hasher = blake3::Hasher::new(); // Hypothetical
while let Some(chunk) = stream.next().await {
let chunk = chunk.unwrap();
hasher.update(&chunk); // Process incrementally
// Memory: only current chunk
}
// Memory usage: ~8KB instead of 100MB
}Streaming keeps memory usage bounded regardless of body size.
// In hyper 1.0+, Body is replaced by http_body::Body trait
// The concepts remain the same:
// hyper 0.14:
use hyper::Body;
let body = Body::from("data");
let bytes = hyper::body::to_bytes(body).await?;
// hyper 1.0:
use hyper::body::Incoming;
use http_body_util::BodyExt;
// Incoming implements http_body::Body trait
async fn hyper_1_example() {
// Body is now a trait, not a concrete type
// Use BodyExt for utilities
// BodyExt provides:
// - collect() -> Collected body parts
// - frame() -> Stream of frames (data + trailers)
// - map() -> Transform body chunks
// - boxed() -> Type-erased body
}Hyper 1.0 uses the http_body crate's Body trait.
use hyper::{Body, Request, Response, Server, Method, StatusCode};
use hyper::service::{make_service_fn, service_fn};
use futures::StreamExt;
async fn handle_request(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
match (req.method(), req.uri().path()) {
(&Method::GET, "/stream") => {
// Streaming response
let stream = async_stream::stream! {
for i in 0..5 {
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
yield Ok(bytes::Bytes::from(format!("Event {}\n", i)));
}
};
Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "text/event-stream")
.body(Body::wrap_stream(stream))
.unwrap())
}
(&Method::POST, "/echo") => {
// Echo request body as response
let body = req.into_body();
Ok(Response::new(body))
}
(&Method::POST, "/process") => {
// Process streaming body
let mut body = req.into_body();
let mut total = 0;
while let Some(chunk) = body.next().await {
let chunk = chunk?;
total += chunk.len();
// Process chunk...
}
Ok(Response::new(Body::from(format!("Processed {} bytes", total))))
}
_ => {
Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from("Not Found"))
.unwrap())
}
}
}Full streaming request/response handling in a server.
Body implementing Stream enables efficient async I/O:
Key implications:
| Aspect | Impact | |--------|--------| | Memory usage | Bounded by chunk size, not body size | | Backpressure | Automatic via Stream polling | | Composability | Works with Stream combinators | | Large files | Stream without loading into memory | | Real-time data | Process as data arrives |
Core pattern:
// Stream a body
let mut stream = body;
while let Some(chunk) = stream.next().await {
let chunk: Result<Bytes, Error> = chunk;
// Process chunk
}
// Create streaming body
let stream = some_async_source();
let body = Body::wrap_stream(stream);When to use streaming:
When to_bytes is fine:
The Stream implementation transforms body handling from "load everything" to "process as it arrives," which is fundamental to building efficient async web services.