How does tower::Service::poll_ready indicate readiness before processing a request?

tower::Service::poll_ready is an async readiness check that returns Poll::Ready(Ok(())) when the service can accept a request or Poll::Pending when it's at capacity, enabling backpressure propagation from downstream services through middleware layers to upstream callers. This design allows services to signal when they're overloaded, rate-limited, or waiting for resources before the caller commits to sending a request.

The Service Trait and poll_ready

use tower::Service;
use std::future::Future;
use std::task::{Context, Poll};
 
// Simplified Service trait:
pub trait Service<Request> {
    type Response;
    type Error;
    type Future: Future<Output = Result<Self::Response, Self::Error>>;
 
    // Check if the service is ready:
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
    
    // Process the request:
    fn call(&mut self, req: Request) -> Self::Future;
}

poll_ready must return Ready before call is invoked for each request.

Basic Readiness Pattern

use tower::Service;
use std::task::{Context, Poll};
use std::collections::VecDeque;
 
struct QueueService {
    queue: VecDeque<String>,
    max_capacity: usize,
}
 
impl Service<String> for QueueService {
    type Response = ();
    type Error = &'static str;
    type Future = std::future::Ready<Result<(), Self::Error>>;
 
    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        // Ready if queue has capacity:
        if self.queue.len() < self.max_capacity {
            Poll::Ready(Ok(()))
        } else {
            // Pending: service is at capacity, caller should wait
            Poll::Pending
        }
    }
 
    fn call(&mut self, req: String) -> Self::Future {
        self.queue.push_back(req);
        std::future::ready(Ok(()))
    }
}

poll_ready returns Pending when the queue is full, signaling backpressure.

Why poll_ready Matters

use tower::Service;
use std::task::{Context, Poll};
 
// Without poll_ready, callers would:
// 1. Send requests regardless of capacity
// 2. Risk memory exhaustion from queued work
// 3. Have no way to propagate backpressure
 
// With poll_ready:
// 1. Callers wait until service has capacity
// 2. Backpressure propagates through middleware
// 3. Resources are managed proactively
 
fn example_usage() {
    // Poll readiness before each call:
    // 1. Check poll_ready
    // 2. If Ready, proceed to call
    // 3. If Pending, wait and retry poll_ready
}

poll_ready enables proactive resource management instead of reactive error handling.

Poll::Ready Meanings

use std::task::{Poll, Context};
use tower::Service;
 
fn readiness_meanings() {
    // Poll::Ready(Ok(())):
    // - Service is ready to accept ONE request
    // - call() can now be invoked
    // - Caller should proceed with request
 
    // Poll::Ready(Err(e)):
    // - Service has failed permanently
    // - Cannot recover; caller should handle error
    // - Service may be shutting down
 
    // Poll::Pending:
    // - Service is temporarily unavailable
    // - Caller should wait for wake-up
    // - Will be woken via cx.waker()
}

The three Poll variants encode service state for callers.

Rate Limiting with poll_ready

use tower::Service;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
 
struct RateLimitedService<S> {
    inner: S,
    requests_per_second: u32,
    tokens: u32,
    last_refill: Instant,
}
 
impl<S, Request> Service<Request> for RateLimitedService<S>
where
    S: Service<Request>,
{
    type Response = S::Response;
    type Error = S::Error;
    type Future = S::Future;
 
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        // Refill tokens based on elapsed time:
        let now = Instant::now();
        let elapsed = now.duration_since(self.last_refill);
        let tokens_to_add = (elapsed.as_secs_f64() * self.requests_per_second as f64) as u32;
        
        if tokens_to_add > 0 {
            self.tokens = (self.tokens + tokens_to_add).min(self.requests_per_second);
            self.last_refill = now;
        }
 
        // If tokens available, forward to inner service:
        if self.tokens > 0 {
            // Check inner service readiness:
            self.inner.poll_ready(cx)?;
            Poll::Ready(Ok(()))
        } else {
            // Rate limited: caller must wait
            // Schedule wake-up for next token refill:
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
 
    fn call(&mut self, req: Request) -> Self::Future {
        self.tokens -= 1;
        self.inner.call(req)
    }
}

Rate limiting uses poll_ready to defer requests until tokens are available.

Concurrency Limits

use tower::Service;
use std::task::{Context, Poll};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
 
struct ConcurrencyLimitedService<S> {
    inner: S,
    active: Arc<AtomicUsize>,
    max_concurrent: usize,
}
 
impl<S, Request> Service<Request> for ConcurrencyLimitedService<S>
where
    S: Service<Request>,
{
    type Response = S::Response;
    type Error = S::Error;
    type Future = S::Future;
 
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        let current = self.active.load(Ordering::Relaxed);
        
        if current < self.max_concurrent {
            // Capacity available: check inner service:
            self.inner.poll_ready(cx)
        } else {
            // At capacity: caller must wait for in-flight to complete
            Poll::Pending
        }
    }
 
    fn call(&mut self, req: Request) -> Self::Future {
        self.active.fetch_add(1, Ordering::Relaxed);
        // Note: actual implementation would decrement on completion
        self.inner.call(req)
    }
}

Concurrency limits use poll_ready to enforce maximum in-flight requests.

Middleware Chaining

use tower::Service;
use std::task::{Context, Poll};
 
// Middleware wraps an inner service and checks readiness:
struct LoggingService<S> {
    inner: S,
}
 
impl<S, Request> Service<Request> for LoggingService<S>
where
    S: Service<Request>,
{
    type Response = S::Response;
    type Error = S::Error;
    type Future = S::Future;
 
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        // Forward readiness to inner:
        self.inner.poll_ready(cx)
    }
 
    fn call(&mut self, req: Request) -> Self::Future {
        println!("Processing request");
        self.inner.call(req)
    }
}
 
// Chaining readiness checks:
struct TimeoutService<S> {
    inner: S,
}
 
impl<S, Request> Service<Request> for TimeoutService<S>
where
    S: Service<Request>,
{
    type Response = S::Response;
    type Error = S::Error;
    type Future = S::Future;
 
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        // Each layer propagates readiness:
        self.inner.poll_ready(cx)
    }
 
    fn call(&mut self, req: Request) -> Self::Future {
        self.inner.call(req)
    }
}

Each middleware layer propagates poll_ready calls to inner services.

Load Shedding

use tower::Service;
use std::task::{Context, Poll};
use std::sync::atomic::{AtomicUsize, Ordering};
 
enum LoadShedError {
    Overloaded,
}
 
struct LoadSheddingService<S> {
    inner: S,
    queue_depth: Arc<AtomicUsize>,
    max_depth: usize,
}
 
impl<S, Request> Service<Request> for LoadSheddingService<S>
where
    S: Service<Request>,
{
    type Response = S::Response;
    type Error = LoadShedError;
    type Future = std::future::Ready<Result<S::Response, Self::Error>>;
 
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        let depth = self.queue_depth.load(Ordering::Relaxed);
        
        if depth >= self.max_depth {
            // Shed load: return error instead of accepting
            // This is different from Pending: we actively reject
            Poll::Ready(Err(LoadShedError::Overloaded))
        } else {
            self.inner.poll_ready(cx).map_err(|_| LoadShedError::Overloaded)
        }
    }
 
    fn call(&mut self, req: Request) -> Self::Future {
        // Actually forward to inner service
        std::future::ready(Err(LoadShedError::Overloaded))
    }
}

Load shedding returns errors from poll_ready to reject requests proactively.

Always-Ready Services

use tower::Service;
use std::task::{Context, Poll};
 
// Simple services that are always ready:
struct EchoService;
 
impl Service<String> for EchoService {
    type Response = String;
    type Error = std::convert::Infallible;
    type Future = std::future::Ready<Result<String, Self::Error>>;
 
    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        // Always ready: no capacity constraints
        Poll::Ready(Ok(()))
    }
 
    fn call(&mut self, req: String) -> Self::Future {
        std::future::ready(Ok(req))
    }
}
 
// Stateless services are often always ready:
struct IdentityService;
 
impl<T> Service<T> for IdentityService {
    type Response = T;
    type Error = std::convert::Infallible;
    type Future = std::future::Ready<Result<T, Self::Error>>;
 
    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }
 
    fn call(&mut self, req: T) -> Self::Future {
        std::future::ready(Ok(req))
    }
}

Stateless services can always return Ready(Ok(())).

Pending and Wake-Ups

use tower::Service;
use std::task::{Context, Poll, Waker};
use std::cell::RefCell;
use std::rc::Rc;
 
struct BufferedService {
    buffer: Rc<RefCell<Vec<String>>>,
    max_size: usize,
    waker: Option<Waker>,
}
 
impl Service<String> for BufferedService {
    type Response = ();
    type Error = &'static str;
    type Future = std::future::Ready<Result<(), Self::Error>>;
 
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        let buffer_len = self.buffer.borrow().len();
        
        if buffer_len < self.max_size {
            Poll::Ready(Ok(()))
        } else {
            // Store waker to be called when space available:
            self.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
 
    fn call(&mut self, req: String) -> Self::Future {
        self.buffer.borrow_mut().push(req);
        
        // If we were at max and now have space, wake:
        if let Some(waker) = self.waker.take() {
            waker.wake();
        }
        
        std::future::ready(Ok(()))
    }
}

Store the Waker and call it when the service becomes ready.

Connection Pooling

use tower::Service;
use std::task::{Context, Poll};
use std::collections::VecDeque;
 
struct PoolService<C> {
    pool: VecDeque<C>,
    waiters: VecDeque<tokio::sync::oneshot::Sender<C>>,
}
 
impl<C, Request> Service<Request> for PoolService<C>
where
    C: Service<Request>,
{
    type Response = C::Response;
    type Error = C::Error;
    type Future = C::Future;
 
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        if self.pool.is_empty() {
            // No connections available:
            // Real implementation would register a waiter
            Poll::Pending
        } else {
            // Check if pooled connection is ready:
            self.pool.front_mut()
                .expect("pool not empty")
                .poll_ready(cx)
        }
    }
 
    fn call(&mut self, req: Request) -> Self::Future {
        let mut conn = self.pool.pop_front().expect("poll_ready ensures availability");
        conn.call(req)
        // Connection would be returned to pool after future completes
    }
}

Connection pools use poll_ready to wait for available connections.

Using Services with poll_ready

use tower::Service;
use std::future::Future;
use std::task::{Context, Poll};
use std::pin::Pin;
 
// Correct usage pattern:
async fn call_service<S, Request>(service: &mut S, request: Request) -> Result<S::Response, S::Error>
where
    S: Service<Request>,
{
    // Wait for service to be ready:
    std::future::poll_fn(|cx| service.poll_ready(cx)).await?;
    
    // Now safe to call:
    service.call(request).await
}
 
// Manual polling (lower level):
fn manual_call<S, Request>(service: &mut S, request: Request, cx: &mut Context<'_>)
    -> Poll<Result<S::Response, S::Error>>
where
    S: Service<Request>,
{
    // Check readiness:
    match service.poll_ready(cx) {
        Poll::Ready(Ok(())) => {
            // Ready: start the call
            Poll::Pending // The call future would handle the rest
        }
        Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
        Poll::Pending => Poll::Pending,
    }
}

Always wait for poll_ready to return Ready(Ok(())) before calling.

Tower's ServiceFn Helper

use tower::service_fn;
 
// service_fn creates a service that's always ready:
async fn helper_example() {
    let service = service_fn(|req: String| async move {
        Ok::<String, std::convert::Infallible>(format!("Echo: {}", req))
    });
    
    // Always ready, can call immediately:
    let response = service.oneshot("hello".to_string()).await.unwrap();
    println!("{}", response);
}

service_fn wraps a function into a service that's always ready.

Buffer Middleware

use tower::Service;
use tower::buffer::Buffer;
 
// Buffer wraps a service with capacity:
async fn buffer_example() {
    // Creates a bounded channel for requests:
    let service = Buffer::new(
        SomeService::new(),
        1024,  // buffer capacity
    );
    
    // poll_ready returns Pending when buffer is full:
    // This is handled automatically by Buffer middleware
}
 
struct SomeService;
 
impl Service<String> for SomeService {
    type Response = String;
    type Error = std::convert::Infallible;
    type Future = std::future::Ready<Result<String, Self::Error>>;
 
    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }
 
    fn call(&mut self, req: String) -> Self::Future {
        std::future::ready(Ok(req))
    }
}

Buffer middleware uses a channel to provide bounded buffering.

Real-World: HTTP Server

use tower::Service;
use std::task::{Context, Poll};
 
// Hyper uses Service for each request:
struct HttpService {
    rate_limiter: RateLimiter,
    handler: Handler,
}
 
impl Service<hyper::Request<hyper::Body>> for HttpService {
    type Response = hyper::Response<hyper::Body>;
    type Error = hyper::Error;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
 
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        // Check rate limiter first:
        if !self.rate_limiter.check() {
            // Rate limited: pending until token available
            cx.waker().wake_by_ref();
            return Poll::Pending;
        }
        
        // Check handler readiness:
        self.handler.poll_ready(cx)
    }
 
    fn call(&mut self, req: hyper::Request<hyper::Body>) -> Self::Future {
        Box::pin(self.handler.handle(req))
    }
}
 
struct RateLimiter {
    tokens: usize,
}
 
impl RateLimiter {
    fn check(&mut self) -> bool {
        if self.tokens > 0 {
            self.tokens -= 1;
            true
        } else {
            false
        }
    }
}

HTTP servers use poll_ready for rate limiting before accepting requests.

Real-World: gRPC Client

use tower::Service;
use std::task::{Context, Poll};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
 
// gRPC clients use poll_ready for connection management:
struct GrpcClient {
    connection_pool: Arc<ConnectionPool>,
    max_pending: usize,
    pending: Arc<AtomicUsize>,
}
 
impl Service<Vec<u8>> for GrpcClient {
    type Response = Vec<u8>;
    type Error = GrpcError;
    type Future = Pin<Box<dyn Future<Output = Result<Vec<u8>, GrpcError>> + Send>>;
 
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), GrpcError>> {
        let pending = self.pending.load(Ordering::Relaxed);
        
        if pending >= self.max_pending {
            // Too many pending requests: backpressure
            Poll::Pending
        } else if self.connection_pool.has_connection() {
            Poll::Ready(Ok(()))
        } else {
            // Waiting for connection:
            Poll::Pending
        }
    }
 
    fn call(&mut self, req: Vec<u8>) -> Self::Future {
        self.pending.fetch_add(1, Ordering::Relaxed);
        let pending = self.pending.clone();
        Box::pin(async move {
            let result = self.connection_pool.send(req).await;
            pending.fetch_sub(1, Ordering::Relaxed);
            result
        })
    }
}
 
struct ConnectionPool;
impl ConnectionPool {
    fn has_connection(&self) -> bool { true }
    async fn send(&self, _: Vec<u8>) -> Result<Vec<u8>, GrpcError> { Ok(vec![]) }
}
 
struct GrpcError;

gRPC clients enforce concurrency limits through poll_ready.

Testing Services

use tower::Service;
use std::task::{Context, Poll};
 
// Testing readiness behavior:
#[test]
fn test_rate_limiting() {
    let mut service = RateLimitedService::new(2); // 2 requests per second
    
    // First request should be ready:
    let cx = &mut Context::from_waker(todo!());
    assert!(matches!(service.poll_ready(cx), Poll::Ready(Ok(()))));
    
    // After two requests:
    service.call(req1);
    service.call(req2);
    
    // Should be pending (rate limited):
    assert!(matches!(service.poll_ready(cx), Poll::Pending));
}
 
struct RateLimitedService {
    requests_per_second: u32,
    tokens: u32,
}
 
impl RateLimitedService {
    fn new(rps: u32) -> Self {
        Self { requests_per_second: rps, tokens: rps }
    }
}

Test readiness state transitions to verify backpressure behavior.

Key Points

fn key_points() {
    // 1. poll_ready returns Ready(Ok(())) when service can accept request
    // 2. poll_ready returns Pending when at capacity or waiting
    // 3. poll_ready returns Ready(Err) for permanent failures
    // 4. Must poll_ready before each call()
    // 5. Pending means caller should wait for wake-up
    // 6. Enables backpressure propagation through middleware
    // 7. Rate limiters use Pending for token exhaustion
    // 8. Concurrency limits use Pending when at max
    // 9. Load shedders return Ready(Err) to reject requests
    // 10. Connection pools use Pending when empty
    // 11. Stateless services are always ready
    // 12. Waker must be stored and called when becoming ready
    // 13. Buffer middleware provides bounded channel buffering
    // 14. Each middleware layer forwards poll_ready to inner
    // 15. Proactive rejection beats reactive error handling
}

Key insight: poll_ready is the mechanism by which services communicate their capacity to accept work, enabling a cascade of backpressure from the innermost resource-constrained service through every middleware layer to the caller. The design is intentional: poll_ready separates capacity checking from request execution, allowing callers to decide whether to wait, shed load, or route elsewhere before committing to a request. When poll_ready returns Pending, the service stores the Waker and calls it when capacity becomes available—this ensures efficient waiting without polling. The Ready(Err(_)) variant signals permanent failure (connection closed, service shutting down), distinct from Pending which signals temporary unavailability. Middleware layers must propagate poll_ready calls to inner services, composing readiness checks: a rate limter wrapping a connection pool checks rate limits first, then checks pool availability. This composition means backpressure propagates automatically: if the innermost service returns Pending, every outer layer's poll_ready will also return Pending, pushing the wait all the way to the caller. The pattern poll_readycall must be followed for each request—calling without readiness risks overwhelming the service or hitting asserts in debug builds.