How does tower::buffer::Buffer provide capacity-limited middleware for backpressure?
tower::buffer::Buffer wraps a Service and provides capacity-limited buffering of requests, creating backpressure when the buffer is full to prevent overwhelming the underlying service. It uses a bounded channel to queue requests, returning Pending when capacity is exhausted, which propagates backpressure to callers and prevents resource exhaustion.
The Problem: Unbounded Request Accumulation
use tower::Service;
use std::future::Future;
use std::pin::Pin;
// Without Buffer, a service might accumulate unlimited requests
async fn without_buffer() {
// Imagine a slow database service
// If requests arrive faster than they can be processed:
// - Memory grows unboundedly
// - Latency increases
// - Eventually the system crashes
// Services need a way to say "I'm busy, slow down"
// This is backpressure
}Without capacity limits, fast request arrival can overwhelm slow services.
Buffer's Capacity-Limited Design
use tower::buffer::Buffer;
use tower::Service;
use std::time::Duration;
use tokio::time::sleep;
// Buffer wraps a service with a bounded queue
async fn buffer_basics() {
// Create a service that takes 1 second per request
let slow_service = // ... some slow service
// Wrap with Buffer, capacity of 10 requests
let buffered = Buffer::new(slow_service, 10);
// Now:
// - Up to 10 requests can be queued
// - The 11th request will wait (backpressure)
// - The underlying service processes one at a time
}Buffer::new creates a bounded queue between callers and the underlying service.
How Buffer Implements Backpressure
use tower::buffer::Buffer;
use tower::Service;
use std::future::Future;
use std::pin::Pin;
// Buffer's Service implementation
impl<S, T> Service<T> for Buffer<S, T>
where
S: Service<T>,
{
type Response = S::Response;
type Error = S::Error;
type Future = ResponseFuture<S::Future>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Key mechanism: poll_ready checks buffer capacity
// If buffer has space:
// - Returns Poll::Ready(Ok(()))
// - Caller can send request
// If buffer is full:
// - Returns Poll::Pending
// - Caller waits (backpressure!)
// When a slot opens up:
// - Wakes the waiting caller
}
fn call(&mut self, request: T) -> Self::Future {
// Sends request to the buffer
// Returns a future that completes when the service processes it
}
}poll_ready returns Pending when the buffer is full, propagating backpressure.
Bounded Channel Architecture
use tower::buffer::Buffer;
// Internally, Buffer uses a bounded MPSC channel
fn buffer_architecture() {
// ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
// β Buffer Architecture β
// ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
// β β
// β Caller Buffer (bounded channel) Worker Task β
// β βββββββββ ββββββββββββββββββββββ βββββββββββββ β
// β β
// β call(req) ββββΊ [req1, req2, ..., reqN] ββββΊ service.call() β
// β β β
// β β β
// β capacity: N β
// β β
// β poll_ready() ββββΊ Checks if channel has room β
// β β
// β When full: β
// β poll_ready() returns Pending ββββΊ Caller waits β
// β β
// ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
// The worker task:
// 1. Pulls requests from the channel
// 2. Calls the underlying service
// 3. Sends responses back to callers
}Buffer spawns a worker task that pulls from a bounded channel.
Basic Usage Example
use tower::buffer::Buffer;
use tower::{Service, ServiceExt};
use std::time::Duration;
use tokio::time::sleep;
// A slow service for demonstration
struct SlowService;
impl Service<String> for SlowService {
type Response = String;
type Error = String;
type Future = Pin<Box<dyn Future<Output = Result<String, String>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: String) -> Self::Future {
Box::pin(async move {
sleep(Duration::from_millis(100)).await; // Simulate slow processing
Ok(format!("Processed: {}", req))
})
}
}
async fn basic_usage() {
// Wrap with capacity of 3
let mut buffer = Buffer::new(SlowService, 3);
// poll_ready checks if we can send
buffer.ready().await.unwrap(); // Buffer has room
// Send a request
let response = buffer.call("request1".to_string()).await.unwrap();
println!("{}", response);
}Create a Buffer with a capacity limit; poll_ready checks availability.
Backpressure in Action
use tower::buffer::Buffer;
use tower::{Service, ServiceExt};
use std::time::Duration;
use tokio::time::{sleep, timeout};
async fn backpressure_demo() {
let slow_service = SlowService;
// Capacity of only 2
let mut buffer = Buffer::new(slow_service, 2);
// Fill the buffer
buffer.ready().await.unwrap();
let fut1 = buffer.call("req1".to_string());
buffer.ready().await.unwrap();
let fut2 = buffer.call("req2".to_string());
// Buffer is now full (2 requests queued)
// poll_ready will return Pending until a slot opens
// Try to send a third request with timeout
let result = timeout(Duration::from_millis(50), buffer.ready()).await;
// This times out because buffer is full
// and slow service hasn't processed any requests yet
// Once fut1 or fut2 completes, a slot opens
// and poll_ready returns Ready
}When the buffer is full, poll_ready returns Pending, creating backpressure.
Capacity Selection Trade-offs
use tower::buffer::Buffer;
fn capacity_selection() {
// βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
// β Capacity β Low Value (e.g., 1-5) β High Value (e.g., 100+) β
// βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
// β Backpressure β Strong, immediate β Weak, delayed β
// β Memory β Minimal β Higher β
// β Latency β May increase (waiting)β Lower (more buffering) β
// β Throughput β May be limited β Higher burst handling β
// β Responsivenessβ Callers know fast β Callers may not know β
// β β about overload β about overload β
// βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
// Choose capacity based on:
// 1. Expected burst size
// 2. Acceptable latency increase
// 3. Memory constraints
// 4. Desired backpressure strength
// Common patterns:
// - Capacity = 1: Maximum backpressure, minimal buffering
// - Capacity = connection pool size: Match active connections
// - Capacity = 10-100: Handle bursts, moderate backpressure
}Lower capacity means stronger backpressure; higher capacity handles bursts but delays backpressure.
Buffer with Other Middleware
use tower::buffer::Buffer;
use tower::limit::concurrency::ConcurrencyLimit;
use tower::ServiceExt;
async fn with_other_middleware() {
// Buffer can be combined with other middleware
let service = SlowService;
// Layer 1: Concurrency limit (max concurrent requests)
let service = ConcurrencyLimit::new(service, 5);
// Layer 2: Buffer (queue before concurrency limit)
let service = Buffer::new(service, 20);
// Order matters:
// - Requests enter Buffer (capacity 20)
// - Requests pass to ConcurrencyLimit (max 5 concurrent)
// - If ConcurrencyLimit is full, Buffer fills up
// - When Buffer is full, callers get backpressure
// This creates a two-stage pipeline:
// Caller -> Buffer(20) -> ConcurrencyLimit(5) -> Service
}
// Alternative ordering:
async fn alternative_ordering() {
// Buffer after concurrency limit
let service = SlowService;
let service = Buffer::new(service, 10);
let service = ConcurrencyLimit::new(service, 5);
// This order:
// - ConcurrencyLimit limits concurrent callers to 5
// - Each caller has its own Buffer
// - Different semantics!
}Buffer placement in the middleware stack affects backpressure behavior.
Thread Safety and Cloning
use tower::buffer::Buffer;
use tower::Service;
use std::sync::Arc;
async fn thread_safety() {
// Buffer is Clone, creating shared access to the same underlying queue
let service = SlowService;
let buffer = Buffer::new(service, 10);
// Clone creates a new handle to the same buffer
let buffer2 = buffer.clone();
// Both handles share the same bounded channel
// Both contribute to the same capacity limit
// This enables multiple callers to share one buffer:
let mut handles = vec![];
for i in 0..3 {
let mut b = buffer.clone();
handles.push(tokio::spawn(async move {
b.ready().await.unwrap();
b.call(format!("request {}", i)).await.unwrap()
}));
}
// All three requests go to the same buffer
// Capacity limit applies to total queued requests
}Buffer implements Clone, allowing multiple callers to share the same queue.
Error Handling
use tower::buffer::Buffer;
use tower::{Service, ServiceExt};
async fn error_handling() {
let service = SlowService;
let mut buffer = Buffer::new(service, 5);
// Errors from the underlying service propagate to callers
// If the underlying service fails:
// - The specific request returns the error
// - The buffer continues operating
// - Other queued requests are unaffected
// If the worker task crashes (unusual):
// - poll_ready returns Err(BufferError)
// - All pending requests fail
// If buffer is dropped:
// - Pending requests get cancellation
// - Their futures resolve with an error
// Safe pattern: always check ready() before call()
buffer.ready().await.unwrap(); // Check capacity
let result = buffer.call("request".to_string()).await; // Send request
}Errors from the underlying service propagate; buffer errors indicate capacity or worker issues.
Comparison with ConcurrencyLimit
use tower::buffer::Buffer;
use tower::limit::concurrency::ConcurrencyLimit;
use tower::load_shed::LoadShed;
fn comparing_middleware() {
// βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
// β Middleware β Mechanism β Behavior When Full β
// βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
// β Buffer β Bounded queue β Returns Pending (waits) β
// β ConcurrencyLimitβ Semaphore β Returns Pending (waits) β
// β LoadShed β Capacity check β Returns error (rejects now) β
// βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
// Buffer:
// - Queues requests up to capacity
// - Backpressures via Pending
// - Good for: absorbing bursts, smoothing load
// ConcurrencyLimit:
// - Limits concurrent in-flight requests
// - Backpressures via Pending
// - Good for: limiting resource usage
// LoadShed:
// - Rejects requests when overloaded
// - No waiting, immediate error
// - Good for: fail-fast scenarios
// Combine them:
// Buffer -> ConcurrencyLimit -> LoadShed -> Service
// Queues first, then limits concurrency, then rejects if truly overloaded
}Buffer queues and waits; LoadShed rejects immediately; ConcurrencyLimit limits in-flight requests.
Real-World Pattern: HTTP Service
use tower::buffer::Buffer;
use tower::{Service, ServiceExt};
async fn http_service_pattern() {
// Simulating an HTTP handler pattern
struct HttpHandler;
impl Service<http::Request<String>> for HttpHandler {
type Response = http::Response<String>;
type Error = String;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: http::Request<String>) -> Self::Future {
// Slow database query simulation
Box::pin(async move {
sleep(Duration::from_millis(100)).await;
Ok(http::Response::builder()
.status(200)
.body("response".to_string())
.unwrap())
})
}
}
// Create buffered service with capacity
let handler = HttpHandler;
let mut service = Buffer::new(handler, 100);
// In an HTTP server, this prevents:
// - Unlimited request queueing
// - Memory exhaustion under load
// - Cascading failures
// When buffer is full:
// - poll_ready returns Pending
// - HTTP framework should slow down accepting connections
// - Backpressure propagates to the TCP layer
}In HTTP services, Buffer prevents overload by creating backpressure that propagates to connection handling.
Monitoring Buffer Usage
use tower::buffer::Buffer;
// Note: Buffer doesn't expose current usage directly
// For monitoring, track externally
async fn monitoring_pattern() {
// Track buffer usage via request counting
let capacity = 100;
let service = SlowService;
let buffer = Buffer::new(service, capacity);
// External tracking for metrics:
let in_flight = Arc::new(AtomicUsize::new(0));
async fn tracked_call<S>(
buffer: &mut S,
request: String,
in_flight: Arc<AtomicUsize>,
) -> Result<S::Response, S::Error>
where
S: Service<String>
{
buffer.ready().await?;
let before = in_flight.fetch_add(1, Ordering::SeqCst);
// Log if approaching capacity
if before > capacity * 9 / 10 {
eprintln!("Warning: buffer 90% full");
}
let result = buffer.call(request).await;
in_flight.fetch_sub(1, Ordering::SeqCst);
result
}
}Track buffer usage externally since Buffer doesn't expose internal state.
Complete Summary
use tower::buffer::Buffer;
fn complete_summary() {
// βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
// β Component β Description β
// βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
// β Inner type β Service<T> to wrap β
// β Capacity β Maximum queued requests (usize) β
// β Mechanism β Bounded MPSC channel + worker task β
// β poll_ready β Returns Ready if buffer has space, Pending if full β
// β call β Sends request to buffer, returns future β
// β Backpressure β Pending propagation when buffer is full β
// β Thread safety β Clone creates shared handle to same buffer β
// βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
// Buffer provides:
// 1. Capacity Limiting
// - Bounded queue prevents unbounded memory growth
// - Configurable capacity based on requirements
// 2. Backpressure Propagation
// - poll_ready returns Pending when full
// - Callers wait instead of overwhelming service
// - Backpressure propagates through middleware stack
// 3. Request Queueing
// - Smooths burst traffic
// - Maintains ordering (FIFO)
// - Decouples caller timing from service timing
// 4. Thread-Safe Sharing
// - Cloneable for multiple callers
// - Single shared queue
// - Single worker task
// Use Buffer when:
// - Service can't keep up with bursts
// - Need to prevent memory exhaustion
// - Want to propagate backpressure
// - Need to smooth request arrival patterns
}
// Key insight:
// tower::buffer::Buffer implements backpressure through bounded capacity
// and poll_ready's Pending state. When the buffer is full, poll_ready
// returns Pending, signaling callers to wait. This propagates backpressure
// up the call stack, preventing resource exhaustion.
//
// The bounded channel architecture means:
// - Fixed memory usage (capacity Γ request size)
// - Clear signal when overloaded (Pending instead of rejection)
// - Natural flow control as callers slow down
//
// Choose capacity based on:
// - Expected burst sizes
// - Acceptable latency (higher capacity = more queuing)
// - Desired backpressure strength (lower = faster feedback)
// - Memory constraints
//
// Buffer is foundational for building resilient services that
// degrade gracefully under load rather than crashing or consuming
// unlimited resources.Key insight: tower::buffer::Buffer provides capacity-limited backpressure by wrapping a Service with a bounded MPSC channel. When the buffer reaches capacity, poll_ready returns Pending, propagating backpressure to callers. This prevents unbounded memory growth and creates natural flow controlβthe system slows down as it approaches overload rather than crashing. The capacity parameter lets you tune the trade-off between burst handling (higher capacity) and backpressure responsiveness (lower capacity). Buffer is essential for building resilient services that degrade gracefully under load.
