How does tower::Service's poll_ready method enable backpressure in middleware chains?

tower::Service's poll_ready method enables backpressure by allowing services to signal their readiness to accept new requests before work is committed. When poll_ready returns Pending, the caller must wait before calling call, preventing the service from being overwhelmed. This creates a propagation chain: downstream services push back through poll_ready, middleware forwards this signal, and upstream callers eventually stop producing work. The mechanism ensures that load is naturally throttled at the source when downstream capacity is exhausted, without explicit coordination or dropped requests.

The Service Trait

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
 
pub trait Service<Request> {
    type Response;
    type Error;
    type Future: Future<Output = Result<Self::Response, Self::Error>>;
 
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
    fn call(&mut self, req: Request) -> Self::Future;
}

poll_ready checks if the service can accept a request; call executes the request.

Backpressure Without poll_ready

use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
 
// Hypothetical service without backpressure
struct UnboundedService {
    pending: Arc<AtomicUsize>,
    max_concurrent: usize,
}
 
impl UnboundedService {
    fn call(&self, req: String) -> impl std::future::Future<Output = String> {
        // Always accept, even if overloaded
        self.pending.fetch_add(1, Ordering::SeqCst);
        async move {
            // Process request
            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
            req
        }
    }
}
 
// Problem: Requests pile up, memory grows, latency increases
// No mechanism to tell caller to slow down

Without poll_ready, services accept all requests until resources are exhausted.

Basic Backpressure Example

use tower::Service;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
 
struct BoundedService<Inner> {
    inner: Inner,
    pending: usize,
    max_pending: usize,
}
 
impl<Inner, Request> Service<Request> for BoundedService<Inner>
where
    Inner: Service<Request>,
{
    type Response = Inner::Response;
    type Error = Inner::Error;
    type Future = Inner::Future;
 
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        // Check capacity before accepting
        if self.pending >= self.max_pending {
            // Signal: not ready, caller should wait
            Poll::Pending
        } else {
            // Ready to accept
            self.inner.poll_ready(cx)
        }
    }
 
    fn call(&mut self, req: Request) -> Self::Future {
        self.pending += 1;
        self.inner.call(req)
    }
}

poll_ready returns Pending when capacity is exhausted, preventing new requests.

Backpressure Propagation Chain

use tower::Service;
use std::task::{Context, Poll};
 
// Middleware that wraps an inner service
struct LoggingService<Inner> {
    inner: Inner,
}
 
impl<Inner, Request> Service<Request> for LoggingService<Inner>
where
    Inner: Service<Request>,
{
    type Response = Inner::Response;
    type Error = Inner::Error;
    type Future = Inner::Future;
 
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        // Forward backpressure from inner service
        println!("poll_ready: checking inner service");
        self.inner.poll_ready(cx)
    }
 
    fn call(&mut self, req: Request) -> Self::Future {
        println!("call: forwarding request");
        self.inner.call(req)
    }
}
 
// If inner.poll_ready() returns Pending, this middleware propagates Pending
// Backpressure flows up through all middleware layers

Each middleware forwards poll_ready from its inner service, propagating backpressure.

Concrete Rate-Limiting Service

use tower::Service;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll};
use std::pin::Pin;
use std::future::Future;
 
struct RateLimited<Inner> {
    inner: Inner,
    concurrent: Arc<AtomicUsize>,
    max_concurrent: usize,
}
 
impl<Inner, Request> Service<Request> for RateLimited<Inner>
where
    Inner: Service<Request>,
{
    type Response = Inner::Response;
    type Error = Inner::Error;
    type Future = Inner::Future;
 
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        let current = self.concurrent.load(Ordering::Relaxed);
        
        if current >= self.max_concurrent {
            // At capacity: signal not ready
            // Caller must wait and poll again later
            cx.waker().wake_by_ref();  // Suggest immediate re-poll
            Poll::Pending
        } else {
            // Has capacity: also check inner service
            self.inner.poll_ready(cx)
        }
    }
 
    fn call(&mut self, req: Request) -> Self::Future {
        self.concurrent.fetch_add(1, Ordering::Relaxed);
        self.inner.call(req)
    }
}

Rate limiting via poll_ready prevents overload without dropping requests.

Buffer Middleware and Backpressure

use tower::Service;
use tower::buffer::Buffer;
use std::task::{Context, Poll};
 
// Buffer creates a bounded channel between caller and service
// It allows some requests to queue without blocking the caller
 
async fn use_buffer() -> Result<(), Box<dyn std::error::Error>> {
    let service = SomeService::new();
    
    // Buffer allows up to 10 pending requests
    let mut buffered = Buffer::new(service, 10);
    
    // poll_ready on Buffer:
    // - Returns Ready if channel has capacity
    // - Returns Pending if channel is full
    // - Internally queues requests
    
    // When inner service is slow:
    // - Buffer fills up
    // - poll_ready returns Pending
    // - Caller experiences backpressure
    
    Ok(())
}

Buffer provides bounded queueing with backpressure when full.

poll_ready and Async Call Sites

use tower::Service;
 
async fn drive_service<S, Request>(mut service: S, requests: Vec<Request>) 
where
    S: Service<Request>,
{
    for req in requests {
        // Wait for service to be ready
        // This is the backpressure mechanism in action
        futures::future::poll_fn(|cx| service.poll_ready(cx))
            .await
            .expect("service failed");
        
        // Now safe to call
        let future = service.call(req);
        let result = future.await;
        
        println!("Request completed: {:?}", result);
    }
}

The pattern: poll_ready then call, ensuring service never exceeds capacity.

Middleware Stack Example

use tower::Service;
use tower::ServiceBuilder;
use tower::limit::concurrency::ConcurrencyLimit;
use tower::load_shed::LoadShed;
use tower::timeout::Timeout;
use std::time::Duration;
 
async fn build_service() {
    let inner = DatabaseService::new();
    
    // Stack of middleware, each propagating backpressure
    let service = ServiceBuilder::new()
        .layer(ConcurrencyLimitLayer::new(100))  // Limit concurrent requests
        .layer(TimeoutLayer::new(Duration::from_secs(30)))  // Timeout
        .layer(LoadShedLayer::new())  // Shed load when overwhelmed
        .service(inner);
    
    // Backpressure flow:
    // 1. LoadShed checks if overloaded, may reject immediately
    // 2. Timeout sets up timeout mechanism
    // 3. ConcurrencyLimit checks pending count
    // 4. DatabaseService checks its capacity
    // Each layer's poll_ready calls the next layer's poll_ready
}
 
struct DatabaseService;
 
impl DatabaseService {
    fn new() -> Self { DatabaseService }
}
 
impl Service<String> for DatabaseService {
    type Response = String;
    type Error = std::io::Error;
    type Future = std::future::Ready<Result<String, std::io::Error>>;
 
    fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) 
        -> std::task::Poll<Result<(), Self::Error>> 
    {
        // Check database connection pool
        std::task::Poll::Ready(Ok(()))
    }
 
    fn call(&mut self, req: String) -> Self::Future {
        std::future::ready(Ok(format!("Processed: {}", req)))
    }
}

Each layer's poll_ready propagates signals from downstream to upstream.

Load Shedding vs Backpressure

use tower::Service;
use std::task::{Context, Poll};
use std::future::Future;
use std::pin::Pin;
 
// Load shedding: reject requests when overloaded
struct LoadShed<Inner> {
    inner: Inner,
    overloaded: bool,
}
 
impl<Inner, Request> Service<Request> for LoadShed<Inner>
where
    Inner: Service<Request>,
{
    type Response = Inner::Response;
    type Error = Inner::Error;
    type Future = Inner::Future;
 
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        if self.overloaded {
            // Signal error: service cannot accept requests
            // This causes call() to fail, not block
            Poll::Ready(Err(/* overload error */))
        } else {
            self.inner.poll_ready(cx)
        }
    }
 
    fn call(&mut self, req: Request) -> Self::Future {
        self.inner.call(req)
    }
}
 
// Backpressure: caller waits when service is busy
// Load shedding: caller gets error when service is busy

Poll::Ready(Err(...)) signals failure; Poll::Pending signals wait.

Concurrency Limit Implementation

use tower::Service;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll};
use std::future::Future;
use std::pin::Pin;
use std::collections::VecDeque;
 
struct ConcurrencyLimit<Inner> {
    inner: Inner,
    current: Arc<AtomicUsize>,
    limit: usize,
    waiters: VecDeque<std::task::Waker>,
}
 
impl<Inner, Request> Service<Request> for ConcurrencyLimit<Inner>
where
    Inner: Service<Request>,
{
    type Response = Inner::Response;
    type Error = Inner::Error;
    type Future = Inner::Future;
 
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        let current = self.current.load(Ordering::Acquire);
        
        if current < self.limit {
            // Capacity available: ready to accept
            Poll::Ready(Ok(()))
        } else {
            // At limit: register for wake-up
            self.waiters.push_back(cx.waker().clone());
            Poll::Pending
        }
    }
 
    fn call(&mut self, req: Request) -> Self::Future {
        self.current.fetch_add(1, Ordering::AcqRel);
        self.inner.call(req)
    }
}
 
// When a request completes:
// 1. Decrement counter
// 2. Wake one waiting task
// 3. Waiting task's poll_ready succeeds

Concurrency limits use poll_ready to queue callers when at capacity.

Tower's Service Call Pattern

use tower::Service;
use std::future::Future;
 
async fn call_service<S, Request>(service: &mut S, request: Request) 
    -> Result<S::Response, S::Error>
where
    S: Service<Request>,
{
    // The canonical pattern:
    loop {
        // 1. Wait for readiness
        futures::future::poll_fn(|cx| service.poll_ready(cx))
            .await?;
        
        // 2. Call the service
        // poll_ready guarantees call won't panic or misbehave
        let future = service.call(request);
        
        // 3. Wait for response
        // Note: some services allow concurrent calls after poll_ready
        // Others require waiting for response before next poll_ready
        
        return future.await;
    }
}

The poll_ready then call pattern is the foundation of backpressure.

poll_ready in Hyper

use hyper::service::{Service, service_fn};
use hyper::{Body, Request, Response, Server};
use std::convert::Infallible;
 
async fn serve() {
    // Hyper's server uses Service trait
    // For each incoming connection:
    // 1. Accept connection
    // 2. Wait for request
    // 3. Call poll_ready on service
    // 4. If Pending, wait before calling
    // 5. Call service with request
    
    let make_service = service_fn(|_req: Request<Body>| async {
        Ok::<_, Infallible>(Response::new(Body::from("Hello")))
    });
    
    // Hyper manages backpressure automatically
    // If service is slow, poll_ready returns Pending
    // Hyper stops accepting new connections
}

Hyper uses poll_ready for automatic backpressure.

Combining poll_ready with Channels

use tower::Service;
use tokio::sync::mpsc;
 
struct ChannelService {
    sender: mpsc::Sender<String>,
    pending: usize,
    max_pending: usize,
}
 
impl Service<String> for ChannelService {
    type Response = ();
    type Error = mpsc::error::SendError<String>;
    type Future = std::future::Ready<Result<(), Self::Error>>;
 
    fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) 
        -> std::task::Poll<Result<(), Self::Error>> 
    {
        if self.pending >= self.max_pending {
            // Channel full: apply backpressure
            std::task::Poll::Pending
        } else {
            std::task::Poll::Ready(Ok(()))
        }
    }
 
    fn call(&mut self, req: String) -> Self::Future {
        self.pending += 1;
        let sender = self.sender.clone();
        std::future::ready(sender.try_send(req).map(|_| ()))
    }
}

Bounded channels naturally provide backpressure through capacity limits.

Error vs Pending Semantics

use tower::Service;
use std::task::{Context, Poll};
 
// Poll::Ready(Ok(()))  -> Service ready, call will succeed
// Poll::Ready(Err(_))  -> Service failed, call returns error
// Poll::Pending       -> Service busy, wait and retry
 
struct ServiceWithErrors<Inner> {
    inner: Inner,
    is_broken: bool,
}
 
impl<Inner, Request> Service<Request> for ServiceWithErrors<Inner>
where
    Inner: Service<Request>,
{
    type Response = Inner::Response;
    type Error = Inner::Error;
    type Future = Inner::Future;
 
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        if self.is_broken {
            // Permanent failure: return error
            // Caller should not retry
            Poll::Ready(Err(/* permanent error */))
        } else {
            // Forward to inner
            self.inner.poll_ready(cx)
        }
    }
 
    fn call(&mut self, req: Request) -> Self::Future {
        self.inner.call(req)
    }
}

Different Poll variants signal different states.

Real-World Example: Database Pool

use tower::Service;
use sqlx::PgPool;
use std::task::{Context, Poll};
 
struct DatabaseService {
    pool: PgPool,
    max_connections: u32,
}
 
impl Service<String> for DatabaseService {
    type Response = String;
    type Error = sqlx::Error;
    type Future = std::future::Ready<Result<String, sqlx::Error>>;
 
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        // Check if pool has available connections
        let status = self.pool.status();
        
        if status.idle_connections == 0 && status.connections >= self.max_connections {
            // Pool exhausted: backpressure
            // Waiters will be woken when connections return
            Poll::Pending
        } else {
            // Pool has capacity
            Poll::Ready(Ok(()))
        }
    }
 
    fn call(&mut self, query: String) -> Self::Future {
        // Execute query
        std::future::ready(Ok(format!("Result of: {}", query)))
    }
}

Database pools use poll_ready to apply backpressure when exhausted.

Backpressure Through Layers Summary

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚            Caller (client)              β”‚
β”‚   waits on poll_ready before call       β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
              β”‚ poll_ready
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚           Timeout Layer                 β”‚
β”‚   forwards poll_ready, adds timeout     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
              β”‚ poll_ready
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚        Concurrency Limit                β”‚
β”‚   returns Pending if at capacity        β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
              β”‚ poll_ready
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚           Buffer Layer                  β”‚
β”‚   queues requests, backs up when full   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
              β”‚ poll_ready
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚         Inner Service                   β”‚
β”‚   core logic, may return Pending        β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Backpressure propagates through the middleware stack.

Synthesis

poll_ready enables backpressure by inverting control: instead of services accepting all requests and potentially failing, services advertise readiness and callers wait:

The mechanism: poll_ready returns Ready(Ok(())) when the service can accept a request, Ready(Err(...)) when the service has failed, or Pending when busy. Callers must wait on Pending before calling call. This creates a natural flow control: busy services return Pending, callers wait, and load stops at the source.

Middleware propagation: Each middleware layer's poll_ready calls its inner service's poll_ready. When the innermost service returns Pending, this propagates through all layers. The entire middleware chain becomes backpressured together. This is compositionally powerful: add a concurrency limit, it naturally applies backpressure to everything above it.

Key insight: Backpressure through poll_ready is cooperative, not preemptive. It relies on callers checking readiness before making requests. In frameworks like Tower and Hyper, this pattern is enforced through service call abstractions. The alternativeβ€”load sheddingβ€”rejects requests explicitly with errors, which pushes failure handling to callers but avoids waiting. Backpressure with poll_ready queues requests in the caller's task, which uses memory but avoids dropped work. Choose based on whether you want callers to wait (backpressure) or handle failures (load shedding).