Loading page…
Rust walkthroughs
Loading page…
reqwest::Body::wrap_stream and from_bytes for streaming vs static content?Body::from_bytes loads the entire request body into memory before transmission, while wrap_stream allows sending data as it becomes available, reducing memory pressure and enabling transfer of data larger than available RAM. The choice between these approaches represents a fundamental trade-off: static content is simpler and allows for automatic retry and content-length headers, while streaming content enables memory-efficient transfer of large or dynamically-generated data but requires careful handling of errors and backpressure. from_bytes is appropriate for small, known-size payloads where simplicity matters; wrap_stream is essential for large files, real-time data, or when memory consumption must be bounded.
use reqwest::Body;
fn create_static_body() -> Body {
let data = b"Hello, World!".to_vec();
Body::from_bytes(data)
}
fn create_static_body_from_string() -> Body {
let json = r#"{"name": "example", "value": 42}"#;
Body::from(json)
}
#[tokio::main]
async fn main() {
let client = reqwest::Client::new();
// from_bytes: entire body in memory
let body = Body::from_bytes(vec
![0u8; 1024]);
let response = client
.post("https://example.com/upload")
.body(body)
.send()
.await;
}from_bytes creates a body from a complete buffer, sending it all at once.
use reqwest::Body;
use futures::stream::{self, StreamExt};
fn create_streaming_body() -> Body {
// Create a stream of byte chunks
let stream = stream::iter(vec
![
Ok::<_, reqwest::Error>(b"chunk1".to_vec()),
Ok(b"chunk2".to_vec()),
Ok(b"chunk3".toVec()),
]);
Body::wrap_stream(stream)
}
#[tokio::main]
async fn main() {
let client = reqwest::Client::new();
// wrap_stream: data streamed as chunks
let body = create_streaming_body();
let response = client
.post("https://example.com/upload")
.body(body)
.send()
.await;
}wrap_stream creates a body from an async stream, sending chunks as they become available.
use reqwest::Body;
use futures::stream::{StreamExt, TryStreamExt};
use tokio::fs::File;
use tokio::io::AsyncReadExt;
// from_bytes: loads entire file into memory
async fn upload_file_static(path: &str) -> Result<(), Box<dyn std::error::Error>> {
let client = reqwest::Client::new();
// Entire file loaded into Vec<u8>
let mut file = File::open(path).await?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer).await?;
// Peak memory: file size + overhead
println!("Buffer size: {} bytes", buffer.len());
let body = Body::from_bytes(buffer);
client
.post("https://example.com/upload")
.body(body)
.send()
.await?;
Ok(())
}
// wrap_stream: streams file without full memory load
async fn upload_file_streaming(path: &str) -> Result<(), Box<dyn std::error::Error>> {
let client = reqwest::Client::new();
// File opened, read in chunks during transmission
let file = File::open(path).await?;
let stream = tokio_util::io::ReaderStream::new(file);
// Memory: only the current chunk + buffer overhead
let body = Body::wrap_stream(stream);
client
.post("https://example.com/upload")
.body(body)
.send()
.await?;
Ok(())
}from_bytes requires memory for the entire payload; wrap_stream needs only chunk-sized buffers.
use reqwest::Body;
fn static_with_content_length() {
// from_bytes: Content-Length set automatically
let data = b"Hello, World!".to_vec();
let len = data.len();
let body = Body::from_bytes(data);
// Request includes: Content-Length: 13
// Server can show progress: "13 bytes total"
}
fn streaming_without_length() {
use futures::stream::{self, StreamExt};
// wrap_stream: Content-Length may be unknown
let stream = stream::iter(vec
![
Ok::<_, reqwest::Error>(b"chunk1".to_vec()),
]);
let body = Body::wrap_stream(stream);
// Request uses: Transfer-Encoding: chunked
// No Content-Length header
// Server doesn't know total size upfront
}from_bytes enables Content-Length; wrap_stream typically uses Transfer-Encoding: chunked.
use reqwest::Body;
use futures::stream::StreamExt;
fn streaming_with_known_length() -> Body {
// If you know the size, wrap_stream still works
// but you can set Content-Length manually
let chunks: Vec<Result<Vec<u8>, reqwest::Error>> = vec
![
Ok(b"part1".to_vec()),
Ok(b"part2".to_vec()),
];
let known_length: u64 = 10; // "part1".len() + "part2".len()
Body::wrap_stream(futures::stream::iter(chunks))
// Note: Content-Length not automatically set
// Need to set on request builder
}
#[tokio::main]
async fn main() {
let client = reqwest::Client::new();
let body = streaming_with_known_length();
client
.post("https://example.com/upload")
.header("Content-Length", "10") // Manual header
.body(body)
.send()
.await;
}When using wrap_stream, set Content-Length explicitly if known for better server handling.
use reqwest::Body;
use futures::stream::{self, StreamExt};
// from_bytes: body can be retried automatically
async fn with_retry_static() {
let client = reqwest::Client::new();
let body = Body::from_bytes(b"data".to_vec());
// Body stored in memory, can be sent again
client
.post("https://example.com/api")
.body(body)
.send()
.await
.expect("Failed");
// If you need to retry, you'd need to recreate the body
// But the data is still in memory
}
// wrap_stream: stream consumed, cannot retry
async fn with_retry_streaming() {
let client = reqwest::Client::new();
let stream = stream::iter(vec
![Ok::<_, reqwest::Error>(b"data".to_vec())]);
let body = Body::wrap_stream(stream);
client
.post("https://example.com/api")
.body(body)
.send()
.await
.expect("Failed");
// Stream consumed - cannot resend
// For retry, need to recreate stream from source
}from_bytes bodies can be recreated from in-memory data; wrap_stream consumes the stream.
use reqwest::Body;
use tokio::fs::File;
use tokio_util::io::ReaderStream;
use futures::stream::StreamExt;
async fn upload_large_file(path: &str) -> Result<(), Box<dyn std::error::Error>> {
let client = reqwest::Client::new();
// Open file - doesn't load into memory
let file = File::open(path).await?;
let metadata = file.metadata().await?;
let file_size = metadata.len();
// Create streaming body
let stream = ReaderStream::new(file);
let body = Body::wrap_stream(stream);
// Can upload files larger than RAM
println!("Uploading {} bytes...", file_size);
let response = client
.post("https://example.com/upload")
.header("Content-Length", file_size)
.body(body)
.send()
.await?;
println!("Response: {}", response.status());
Ok(())
}
// This works for files of any size
// Memory usage stays constant regardless of file sizeStreaming is essential for files larger than available memory.
use reqwest::Body;
use futures::stream::{StreamExt, TryStreamExt};
async fn stream_generated_content() -> Body {
// Generate content on-the-fly, stream chunks
let stream = futures::stream::iter(0..5)
.then(|i| async move {
// Simulate async content generation
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let chunk = format!("Chunk {}: some data\n", i);
Ok::<_, reqwest::Error>(chunk.into_bytes())
});
Body::wrap_stream(stream)
}
// Usage: content generated as it's sent
// No need to buffer entire response in memory
#[tokio::main]
async fn main() {
let client = reqwest::Client::new();
let body = stream_generated_content().await;
client
.post("https://example.com/stream")
.body(body)
.send()
.await
.expect("Failed to send");
}wrap_stream enables sending dynamically generated content without buffering.
use reqwest::Body;
use futures::stream::{StreamExt, TryStreamExt};
// Simulated database row type
struct Row {
id: i32,
data: String,
}
async fn stream_database_query() -> Body {
// Simulate streaming database results
let stream = futures::stream::iter(0..100)
.map(|i| Row {
id: i,
data: format!("Data for row {}", i),
})
.map(|row| {
// Convert each row to JSON bytes
let json = serde_json::to_string(&row).unwrap();
Ok::<_, reqwest::Error>(json.into_bytes())
});
Body::wrap_stream(stream)
}
// Alternative: collect all rows first
async fn collect_database_query() -> Body {
// Requires all rows in memory
let mut all_rows = Vec::new();
for i in 0..100 {
all_rows.push(Row {
id: i,
data: format!("Data for row {}", i),
});
}
let json = serde_json::to_string(&all_rows).unwrap();
Body::from(json)
}Streaming database results avoids loading all rows into memory simultaneously.
use reqwest::Body;
use futures::stream::{self, StreamExt};
fn stream_with_errors() -> Body {
let stream = stream::iter(vec
![
Ok::<_, reqwest::Error>(b"valid data".to_vec()),
Err(reqwest::Error::new(
reqwest::error::Kind::Body,
None,
)),
Ok(b"more data".to_vec()), // Never sent
]);
Body::wrap_stream(stream)
}
// When streaming, errors abort the request
// Client must handle partial transmission
// Better approach: handle errors in stream
fn stream_with_recovery() -> Body {
let stream = stream::iter(vec
![
Ok::<_, reqwest::Error>(b"chunk1".to_vec()),
Ok(b"chunk2".toVec()),
Ok(b"chunk3".toVec()),
]);
// Use try_take_while or other combinators to handle errors
Body::wrap_stream(stream)
}
// from_bytes: no stream errors (already have all data)
fn static_no_stream_errors() -> Body {
Body::from_bytes(b"complete data".to_vec())
}Stream errors abort transmission; static bodies have no stream-level errors.
use reqwest::Body;
use futures::stream::{StreamExt, TryStreamExt};
use tokio::time::{sleep, Duration};
async fn streaming_with_backpressure() -> Body {
let stream = futures::stream::iter(0..100)
.throttle(Duration::from_millis(10)) // Rate limit
.map(|i| {
let chunk = format!("Item {}\n", i);
Ok::<_, reqwest::Error>(chunk.into_bytes())
});
Body::wrap_stream(stream)
}
// Backpressure: if server can't receive fast enough,
// stream producer waits (automatic in async streams)
async fn static_no_backpressure() -> Body {
// No backpressure - all data already in memory
let data: Vec<u8> = (0..100)
.flat_map(|i| format!("Item {}\n", i).into_bytes())
.collect();
Body::from_bytes(data)
}Streaming respects backpressure; static bodies send all at once.
use reqwest::Body;
use futures::stream::StreamExt;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
// Streaming downloads (response side)
async fn download_large_file(url: &str, path: &str) -> Result<(), Box<dyn std::error::Error>> {
let client = reqwest::Client::new();
let response = client.get(url).send().await?;
// Stream response body to file
let mut file = File::create(path).await?;
let mut stream = response.bytes_stream();
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
file.write_all(&chunk).await?;
}
file.flush().await?;
Ok(())
}
// Compare with buffering entire response
async fn download_to_memory(url: &str) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
let client = reqwest::Client::new();
let response = client.get(url).send().await?;
let bytes = response.bytes().await?; // Entire body in memory
Ok(bytes.to_vec())
}Download streaming mirrors upload streaming: bytes_stream() vs .bytes().
use reqwest::{Body, multipart};
use tokio::fs::File;
use tokio_util::io::ReaderStream;
async fn multipart_with_stream() -> Result<reqwest::multipart::Form, Box<dyn std::error::Error>> {
// Stream a file in multipart form
let file = File::open("large_file.bin").await?;
let stream = ReaderStream::new(file);
let body = Body::wrap_stream(stream);
let form = multipart::Form::new()
.text("field", "value")
.part("file", multipart::Part::stream(body));
Ok(form)
}
async fn multipart_with_static() -> multipart::Form {
// Static content in multipart form
let file_content = b"file content".to_vec();
multipart::Form::new()
.text("field", "value")
.part("file", multipart::Part::bytes(file_content)
.file_name("file.txt"))
}Multipart forms support both streaming and static content for different parts.
use reqwest::Body;
use futures::stream::{StreamExt, TryStreamExt};
use std::time::Instant;
async fn benchmark_static(size: usize) -> std::time::Duration {
let start = Instant::now();
// Allocate entire buffer
let data = vec
![0u8; size];
let body = Body::from_bytes(data);
// Memory peak: size bytes
start.elapsed()
}
async fn benchmark_streaming(size: usize, chunk_size: usize) -> std::time::Duration {
let start = Instant::now();
// Stream in chunks
let stream = futures::stream::iter(0..size/chunk_size)
.map(|_| Ok::<_, reqwest::Error>(vec
![0u8; chunk_size]));
let body = Body::wrap_stream(stream);
// Memory peak: chunk_size bytes (approximately)
start.elapsed()
}
// Trade-offs:
// Static:
// + Lower CPU overhead (single buffer)
// + Automatic Content-Length
// + Can retry without source access
// - Memory scales with body size
// - Latency before first byte sent
// Streaming:
// + Constant memory usage
// + First byte sent immediately
// + Can send infinite/indefinite content
// - Higher per-chunk overhead
// - Stream consumed on error
// - No automatic Content-LengthChoose based on constraints: memory vs. CPU overhead, retry requirements, content size.
// Use from_bytes when:
// 1. Body size is small and known
// 2. Memory is not constrained
// 3. You need Content-Length automatically
// 4. Retry support without source access
fn small_static_body() -> Body {
// JSON request body
let json = r#"{"id": 123, "name": "test"}"#;
Body::from(json)
}
fn binary_static_body() -> Body {
// Small binary upload
let data = include_bytes!("small_file.bin");
Body::from_bytes(data.to_vec())
}
// Use wrap_stream when:
// 1. Body size is large or unknown
// 2. Memory must be bounded
// 3. Content is generated dynamically
// 4. Transferring from another stream (proxy)
async fn large_file_stream(path: &str) -> Body {
let file = tokio::fs::File::open(path).await.unwrap();
let stream = tokio_util::io::ReaderStream::new(file);
Body::wrap_stream(stream)
}
async fn generated_stream() -> Body {
let stream = futures::stream::iter(0..1000)
.map(|i| Ok(format!("Line {}\n", i).into_bytes()));
Body::wrap_stream(stream)
}Small payloads favor from_bytes; large or dynamic content requires wrap_stream.
use reqwest::{Body, Client};
use tokio::fs::File;
use tokio_util::io::ReaderStream;
use std::path::Path;
struct UploadService {
client: Client,
}
impl UploadService {
async fn upload_small_file(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
// Small files: load into memory
let contents = tokio::fs::read(path).await?;
self.client
.post("https://api.example.com/upload")
.body(Body::from_bytes(contents))
.send()
.await?;
Ok(())
}
async fn upload_large_file(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
// Large files: stream
let file = File::open(path).await?;
let metadata = file.metadata().await?;
let size = metadata.len();
let stream = ReaderStream::new(file);
let body = Body::wrap_stream(stream);
self.client
.post("https://api.example.com/upload")
.header("Content-Length", size)
.body(body)
.send()
.await?;
Ok(())
}
async fn upload_file(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
const STREAM_THRESHOLD: u64 = 10 * 1024 * 1024; // 10 MB
let metadata = tokio::fs::metadata(path).await?;
if metadata.len() > STREAM_THRESHOLD {
self.upload_large_file(path).await
} else {
self.upload_small_file(path).await
}
}
}Hybrid approach: use static for small files, streaming for large files.
Core trade-offs:
| Aspect | from_bytes | wrap_stream |
|--------|-------------|---------------|
| Memory | Scales with body size | Constant (chunk buffer) |
| Content-Length | Automatic | Manual or chunked |
| Retry | Easy (data in memory) | Requires source access |
| Latency | Wait for complete buffer | First byte immediately |
| Error handling | Simple (complete body) | Stream errors abort |
| Use case | Small, known content | Large or dynamic content |
from_bytes characteristics:
// Pros:
// - Simple API
// - Automatic Content-Length
// - Can retry without source access
// - Lower CPU overhead
// - No stream error handling
// Cons:
// - Memory scales with body size
// - Latency before transmission starts
// - Can't exceed available memorywrap_stream characteristics:
// Pros:
// - Constant memory usage
// - Immediate transmission start
// - Can stream infinite/indefinite content
// - Backpressure-aware
// Cons:
// - No automatic Content-Length
// - Stream consumed on error
// - Higher per-chunk overhead
// - Requires error handling in streamKey insight: The choice between from_bytes and wrap_stream is fundamentally about resource management. from_bytes trades memory for simplicity—you know the entire content upfront, the HTTP layer can set headers correctly, and retries are straightforward. wrap_stream trades simplicity for memory efficiency—you can send arbitrarily large or dynamically-generated content with bounded memory, but you lose automatic headers and retry simplicity. For most applications, a hybrid approach works best: use from_bytes for small payloads (typically under 10MB) and wrap_stream for large files, real-time data, or when memory is constrained. The threshold depends on your environment's memory limits and performance requirements.