How does tower::buffer::Buffer provide capacity-limited middleware for backpressure?

tower::buffer::Buffer wraps a Service and provides capacity-limited buffering of requests, creating backpressure when the buffer is full to prevent overwhelming the underlying service. It uses a bounded channel to queue requests, returning Pending when capacity is exhausted, which propagates backpressure to callers and prevents resource exhaustion.

The Problem: Unbounded Request Accumulation

use tower::Service;
use std::future::Future;
use std::pin::Pin;
 
// Without Buffer, a service might accumulate unlimited requests
 
async fn without_buffer() {
    // Imagine a slow database service
    // If requests arrive faster than they can be processed:
    // - Memory grows unboundedly
    // - Latency increases
    // - Eventually the system crashes
    
    // Services need a way to say "I'm busy, slow down"
    // This is backpressure
}

Without capacity limits, fast request arrival can overwhelm slow services.

Buffer's Capacity-Limited Design

use tower::buffer::Buffer;
use tower::Service;
use std::time::Duration;
use tokio::time::sleep;
 
// Buffer wraps a service with a bounded queue
 
async fn buffer_basics() {
    // Create a service that takes 1 second per request
    let slow_service = // ... some slow service
    
    // Wrap with Buffer, capacity of 10 requests
    let buffered = Buffer::new(slow_service, 10);
    
    // Now:
    // - Up to 10 requests can be queued
    // - The 11th request will wait (backpressure)
    // - The underlying service processes one at a time
}

Buffer::new creates a bounded queue between callers and the underlying service.

How Buffer Implements Backpressure

use tower::buffer::Buffer;
use tower::Service;
use std::future::Future;
use std::pin::Pin;
 
// Buffer's Service implementation
 
impl<S, T> Service<T> for Buffer<S, T>
where
    S: Service<T>,
{
    type Response = S::Response;
    type Error = S::Error;
    type Future = ResponseFuture<S::Future>;
    
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        // Key mechanism: poll_ready checks buffer capacity
        
        // If buffer has space:
        // - Returns Poll::Ready(Ok(()))
        // - Caller can send request
        
        // If buffer is full:
        // - Returns Poll::Pending
        // - Caller waits (backpressure!)
        
        // When a slot opens up:
        // - Wakes the waiting caller
    }
    
    fn call(&mut self, request: T) -> Self::Future {
        // Sends request to the buffer
        // Returns a future that completes when the service processes it
    }
}

poll_ready returns Pending when the buffer is full, propagating backpressure.

Bounded Channel Architecture

use tower::buffer::Buffer;
 
// Internally, Buffer uses a bounded MPSC channel
 
fn buffer_architecture() {
    // β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    // β”‚                         Buffer Architecture                          β”‚
    // β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
    // β”‚                                                                      β”‚
    // β”‚    Caller          Buffer (bounded channel)      Worker Task        β”‚
    // β”‚    ─────────       ──────────────────────      ─────────────       β”‚
    // β”‚                                                                      β”‚
    // β”‚    call(req)  ───►  [req1, req2, ..., reqN]  ───►  service.call()   β”‚
    // β”‚                     ↑                                                β”‚
    // β”‚                     β”‚                                                β”‚
    // β”‚                capacity: N                                           β”‚
    // β”‚                                                                      β”‚
    // β”‚    poll_ready() ───► Checks if channel has room                      β”‚
    // β”‚                                                                      β”‚
    // β”‚    When full:                                                        β”‚
    // β”‚    poll_ready() returns Pending  ───►  Caller waits                 β”‚
    // β”‚                                                                      β”‚
    // β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
    
    // The worker task:
    // 1. Pulls requests from the channel
    // 2. Calls the underlying service
    // 3. Sends responses back to callers
}

Buffer spawns a worker task that pulls from a bounded channel.

Basic Usage Example

use tower::buffer::Buffer;
use tower::{Service, ServiceExt};
use std::time::Duration;
use tokio::time::sleep;
 
// A slow service for demonstration
struct SlowService;
 
impl Service<String> for SlowService {
    type Response = String;
    type Error = String;
    type Future = Pin<Box<dyn Future<Output = Result<String, String>> + Send>>;
    
    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }
    
    fn call(&mut self, req: String) -> Self::Future {
        Box::pin(async move {
            sleep(Duration::from_millis(100)).await;  // Simulate slow processing
            Ok(format!("Processed: {}", req))
        })
    }
}
 
async fn basic_usage() {
    // Wrap with capacity of 3
    let mut buffer = Buffer::new(SlowService, 3);
    
    // poll_ready checks if we can send
    buffer.ready().await.unwrap();  // Buffer has room
    
    // Send a request
    let response = buffer.call("request1".to_string()).await.unwrap();
    println!("{}", response);
}

Create a Buffer with a capacity limit; poll_ready checks availability.

Backpressure in Action

use tower::buffer::Buffer;
use tower::{Service, ServiceExt};
use std::time::Duration;
use tokio::time::{sleep, timeout};
 
async fn backpressure_demo() {
    let slow_service = SlowService;
    
    // Capacity of only 2
    let mut buffer = Buffer::new(slow_service, 2);
    
    // Fill the buffer
    buffer.ready().await.unwrap();
    let fut1 = buffer.call("req1".to_string());
    
    buffer.ready().await.unwrap();
    let fut2 = buffer.call("req2".to_string());
    
    // Buffer is now full (2 requests queued)
    // poll_ready will return Pending until a slot opens
    
    // Try to send a third request with timeout
    let result = timeout(Duration::from_millis(50), buffer.ready()).await;
    
    // This times out because buffer is full
    // and slow service hasn't processed any requests yet
    
    // Once fut1 or fut2 completes, a slot opens
    // and poll_ready returns Ready
}

When the buffer is full, poll_ready returns Pending, creating backpressure.

Capacity Selection Trade-offs

use tower::buffer::Buffer;
 
fn capacity_selection() {
    // β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    // β”‚ Capacity β”‚ Low Value (e.g., 1-5)      β”‚ High Value (e.g., 100+)       β”‚
    // β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
    // β”‚ Backpressure β”‚ Strong, immediate     β”‚ Weak, delayed               β”‚
    // β”‚ Memory       β”‚ Minimal               β”‚ Higher                      β”‚
    // β”‚ Latency      β”‚ May increase (waiting)β”‚ Lower (more buffering)      β”‚
    // β”‚ Throughput   β”‚ May be limited        β”‚ Higher burst handling       β”‚
    // β”‚ Responsivenessβ”‚ Callers know fast   β”‚ Callers may not know        β”‚
    // β”‚               β”‚ about overload       β”‚ about overload              β”‚
    // β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
    
    // Choose capacity based on:
    // 1. Expected burst size
    // 2. Acceptable latency increase
    // 3. Memory constraints
    // 4. Desired backpressure strength
    
    // Common patterns:
    // - Capacity = 1: Maximum backpressure, minimal buffering
    // - Capacity = connection pool size: Match active connections
    // - Capacity = 10-100: Handle bursts, moderate backpressure
}

Lower capacity means stronger backpressure; higher capacity handles bursts but delays backpressure.

Buffer with Other Middleware

use tower::buffer::Buffer;
use tower::limit::concurrency::ConcurrencyLimit;
use tower::ServiceExt;
 
async fn with_other_middleware() {
    // Buffer can be combined with other middleware
    
    let service = SlowService;
    
    // Layer 1: Concurrency limit (max concurrent requests)
    let service = ConcurrencyLimit::new(service, 5);
    
    // Layer 2: Buffer (queue before concurrency limit)
    let service = Buffer::new(service, 20);
    
    // Order matters:
    // - Requests enter Buffer (capacity 20)
    // - Requests pass to ConcurrencyLimit (max 5 concurrent)
    // - If ConcurrencyLimit is full, Buffer fills up
    // - When Buffer is full, callers get backpressure
    
    // This creates a two-stage pipeline:
    // Caller -> Buffer(20) -> ConcurrencyLimit(5) -> Service
}
 
// Alternative ordering:
async fn alternative_ordering() {
    // Buffer after concurrency limit
    let service = SlowService;
    let service = Buffer::new(service, 10);
    let service = ConcurrencyLimit::new(service, 5);
    
    // This order:
    // - ConcurrencyLimit limits concurrent callers to 5
    // - Each caller has its own Buffer
    // - Different semantics!
}

Buffer placement in the middleware stack affects backpressure behavior.

Thread Safety and Cloning

use tower::buffer::Buffer;
use tower::Service;
use std::sync::Arc;
 
async fn thread_safety() {
    // Buffer is Clone, creating shared access to the same underlying queue
    
    let service = SlowService;
    let buffer = Buffer::new(service, 10);
    
    // Clone creates a new handle to the same buffer
    let buffer2 = buffer.clone();
    
    // Both handles share the same bounded channel
    // Both contribute to the same capacity limit
    
    // This enables multiple callers to share one buffer:
    let mut handles = vec![];
    
    for i in 0..3 {
        let mut b = buffer.clone();
        handles.push(tokio::spawn(async move {
            b.ready().await.unwrap();
            b.call(format!("request {}", i)).await.unwrap()
        }));
    }
    
    // All three requests go to the same buffer
    // Capacity limit applies to total queued requests
}

Buffer implements Clone, allowing multiple callers to share the same queue.

Error Handling

use tower::buffer::Buffer;
use tower::{Service, ServiceExt};
 
async fn error_handling() {
    let service = SlowService;
    let mut buffer = Buffer::new(service, 5);
    
    // Errors from the underlying service propagate to callers
    
    // If the underlying service fails:
    // - The specific request returns the error
    // - The buffer continues operating
    // - Other queued requests are unaffected
    
    // If the worker task crashes (unusual):
    // - poll_ready returns Err(BufferError)
    // - All pending requests fail
    
    // If buffer is dropped:
    // - Pending requests get cancellation
    // - Their futures resolve with an error
    
    // Safe pattern: always check ready() before call()
    buffer.ready().await.unwrap();  // Check capacity
    let result = buffer.call("request".to_string()).await;  // Send request
}

Errors from the underlying service propagate; buffer errors indicate capacity or worker issues.

Comparison with ConcurrencyLimit

use tower::buffer::Buffer;
use tower::limit::concurrency::ConcurrencyLimit;
use tower::load_shed::LoadShed;
 
fn comparing_middleware() {
    // β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    // β”‚ Middleware      β”‚ Mechanism          β”‚ Behavior When Full            β”‚
    // β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
    // β”‚ Buffer          β”‚ Bounded queue      β”‚ Returns Pending (waits)        β”‚
    // β”‚ ConcurrencyLimitβ”‚ Semaphore          β”‚ Returns Pending (waits)        β”‚
    // β”‚ LoadShed        β”‚ Capacity check     β”‚ Returns error (rejects now)    β”‚
    // β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
    
    // Buffer:
    // - Queues requests up to capacity
    // - Backpressures via Pending
    // - Good for: absorbing bursts, smoothing load
    
    // ConcurrencyLimit:
    // - Limits concurrent in-flight requests
    // - Backpressures via Pending
    // - Good for: limiting resource usage
    
    // LoadShed:
    // - Rejects requests when overloaded
    // - No waiting, immediate error
    // - Good for: fail-fast scenarios
    
    // Combine them:
    // Buffer -> ConcurrencyLimit -> LoadShed -> Service
    // Queues first, then limits concurrency, then rejects if truly overloaded
}

Buffer queues and waits; LoadShed rejects immediately; ConcurrencyLimit limits in-flight requests.

Real-World Pattern: HTTP Service

use tower::buffer::Buffer;
use tower::{Service, ServiceExt};
 
async fn http_service_pattern() {
    // Simulating an HTTP handler pattern
    
    struct HttpHandler;
    
    impl Service<http::Request<String>> for HttpHandler {
        type Response = http::Response<String>;
        type Error = String;
        type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
        
        fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
            Poll::Ready(Ok(()))
        }
        
        fn call(&mut self, req: http::Request<String>) -> Self::Future {
            // Slow database query simulation
            Box::pin(async move {
                sleep(Duration::from_millis(100)).await;
                Ok(http::Response::builder()
                    .status(200)
                    .body("response".to_string())
                    .unwrap())
            })
        }
    }
    
    // Create buffered service with capacity
    let handler = HttpHandler;
    let mut service = Buffer::new(handler, 100);
    
    // In an HTTP server, this prevents:
    // - Unlimited request queueing
    // - Memory exhaustion under load
    // - Cascading failures
    
    // When buffer is full:
    // - poll_ready returns Pending
    // - HTTP framework should slow down accepting connections
    // - Backpressure propagates to the TCP layer
}

In HTTP services, Buffer prevents overload by creating backpressure that propagates to connection handling.

Monitoring Buffer Usage

use tower::buffer::Buffer;
 
// Note: Buffer doesn't expose current usage directly
// For monitoring, track externally
 
async fn monitoring_pattern() {
    // Track buffer usage via request counting
    
    let capacity = 100;
    let service = SlowService;
    let buffer = Buffer::new(service, capacity);
    
    // External tracking for metrics:
    let in_flight = Arc::new(AtomicUsize::new(0));
    
    async fn tracked_call<S>(
        buffer: &mut S,
        request: String,
        in_flight: Arc<AtomicUsize>,
    ) -> Result<S::Response, S::Error>
    where
        S: Service<String>
    {
        buffer.ready().await?;
        let before = in_flight.fetch_add(1, Ordering::SeqCst);
        
        // Log if approaching capacity
        if before > capacity * 9 / 10 {
            eprintln!("Warning: buffer 90% full");
        }
        
        let result = buffer.call(request).await;
        in_flight.fetch_sub(1, Ordering::SeqCst);
        result
    }
}

Track buffer usage externally since Buffer doesn't expose internal state.

Complete Summary

use tower::buffer::Buffer;
 
fn complete_summary() {
    // β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    // β”‚ Component          β”‚ Description                                         β”‚
    // β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
    // β”‚ Inner type         β”‚ Service<T> to wrap                                  β”‚
    // β”‚ Capacity           β”‚ Maximum queued requests (usize)                     β”‚
    // β”‚ Mechanism          β”‚ Bounded MPSC channel + worker task                 β”‚
    // β”‚ poll_ready         β”‚ Returns Ready if buffer has space, Pending if full β”‚
    // β”‚ call               β”‚ Sends request to buffer, returns future           β”‚
    // β”‚ Backpressure       β”‚ Pending propagation when buffer is full             β”‚
    // β”‚ Thread safety      β”‚ Clone creates shared handle to same buffer         β”‚
    // β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
    
    // Buffer provides:
    
    // 1. Capacity Limiting
    //    - Bounded queue prevents unbounded memory growth
    //    - Configurable capacity based on requirements
    
    // 2. Backpressure Propagation
    //    - poll_ready returns Pending when full
    //    - Callers wait instead of overwhelming service
    //    - Backpressure propagates through middleware stack
    
    // 3. Request Queueing
    //    - Smooths burst traffic
    //    - Maintains ordering (FIFO)
    //    - Decouples caller timing from service timing
    
    // 4. Thread-Safe Sharing
    //    - Cloneable for multiple callers
    //    - Single shared queue
    //    - Single worker task
    
    // Use Buffer when:
    // - Service can't keep up with bursts
    // - Need to prevent memory exhaustion
    // - Want to propagate backpressure
    // - Need to smooth request arrival patterns
}
 
// Key insight:
// tower::buffer::Buffer implements backpressure through bounded capacity
// and poll_ready's Pending state. When the buffer is full, poll_ready
// returns Pending, signaling callers to wait. This propagates backpressure
// up the call stack, preventing resource exhaustion.
//
// The bounded channel architecture means:
// - Fixed memory usage (capacity Γ— request size)
// - Clear signal when overloaded (Pending instead of rejection)
// - Natural flow control as callers slow down
//
// Choose capacity based on:
// - Expected burst sizes
// - Acceptable latency (higher capacity = more queuing)
// - Desired backpressure strength (lower = faster feedback)
// - Memory constraints
//
// Buffer is foundational for building resilient services that
// degrade gracefully under load rather than crashing or consuming
// unlimited resources.

Key insight: tower::buffer::Buffer provides capacity-limited backpressure by wrapping a Service with a bounded MPSC channel. When the buffer reaches capacity, poll_ready returns Pending, propagating backpressure to callers. This prevents unbounded memory growth and creates natural flow controlβ€”the system slows down as it approaches overload rather than crashing. The capacity parameter lets you tune the trade-off between burst handling (higher capacity) and backpressure responsiveness (lower capacity). Buffer is essential for building resilient services that degrade gracefully under load.