How does tower::Service::poll_ready indicate readiness before processing a request?
tower::Service::poll_ready is an async readiness check that returns Poll::Ready(Ok(())) when the service can accept a request or Poll::Pending when it's at capacity, enabling backpressure propagation from downstream services through middleware layers to upstream callers. This design allows services to signal when they're overloaded, rate-limited, or waiting for resources before the caller commits to sending a request.
The Service Trait and poll_ready
use tower::Service;
use std::future::Future;
use std::task::{Context, Poll};
// Simplified Service trait:
pub trait Service<Request> {
type Response;
type Error;
type Future: Future<Output = Result<Self::Response, Self::Error>>;
// Check if the service is ready:
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
// Process the request:
fn call(&mut self, req: Request) -> Self::Future;
}poll_ready must return Ready before call is invoked for each request.
Basic Readiness Pattern
use tower::Service;
use std::task::{Context, Poll};
use std::collections::VecDeque;
struct QueueService {
queue: VecDeque<String>,
max_capacity: usize,
}
impl Service<String> for QueueService {
type Response = ();
type Error = &'static str;
type Future = std::future::Ready<Result<(), Self::Error>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Ready if queue has capacity:
if self.queue.len() < self.max_capacity {
Poll::Ready(Ok(()))
} else {
// Pending: service is at capacity, caller should wait
Poll::Pending
}
}
fn call(&mut self, req: String) -> Self::Future {
self.queue.push_back(req);
std::future::ready(Ok(()))
}
}poll_ready returns Pending when the queue is full, signaling backpressure.
Why poll_ready Matters
use tower::Service;
use std::task::{Context, Poll};
// Without poll_ready, callers would:
// 1. Send requests regardless of capacity
// 2. Risk memory exhaustion from queued work
// 3. Have no way to propagate backpressure
// With poll_ready:
// 1. Callers wait until service has capacity
// 2. Backpressure propagates through middleware
// 3. Resources are managed proactively
fn example_usage() {
// Poll readiness before each call:
// 1. Check poll_ready
// 2. If Ready, proceed to call
// 3. If Pending, wait and retry poll_ready
}poll_ready enables proactive resource management instead of reactive error handling.
Poll::Ready Meanings
use std::task::{Poll, Context};
use tower::Service;
fn readiness_meanings() {
// Poll::Ready(Ok(())):
// - Service is ready to accept ONE request
// - call() can now be invoked
// - Caller should proceed with request
// Poll::Ready(Err(e)):
// - Service has failed permanently
// - Cannot recover; caller should handle error
// - Service may be shutting down
// Poll::Pending:
// - Service is temporarily unavailable
// - Caller should wait for wake-up
// - Will be woken via cx.waker()
}The three Poll variants encode service state for callers.
Rate Limiting with poll_ready
use tower::Service;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
struct RateLimitedService<S> {
inner: S,
requests_per_second: u32,
tokens: u32,
last_refill: Instant,
}
impl<S, Request> Service<Request> for RateLimitedService<S>
where
S: Service<Request>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Refill tokens based on elapsed time:
let now = Instant::now();
let elapsed = now.duration_since(self.last_refill);
let tokens_to_add = (elapsed.as_secs_f64() * self.requests_per_second as f64) as u32;
if tokens_to_add > 0 {
self.tokens = (self.tokens + tokens_to_add).min(self.requests_per_second);
self.last_refill = now;
}
// If tokens available, forward to inner service:
if self.tokens > 0 {
// Check inner service readiness:
self.inner.poll_ready(cx)?;
Poll::Ready(Ok(()))
} else {
// Rate limited: caller must wait
// Schedule wake-up for next token refill:
cx.waker().wake_by_ref();
Poll::Pending
}
}
fn call(&mut self, req: Request) -> Self::Future {
self.tokens -= 1;
self.inner.call(req)
}
}Rate limiting uses poll_ready to defer requests until tokens are available.
Concurrency Limits
use tower::Service;
use std::task::{Context, Poll};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
struct ConcurrencyLimitedService<S> {
inner: S,
active: Arc<AtomicUsize>,
max_concurrent: usize,
}
impl<S, Request> Service<Request> for ConcurrencyLimitedService<S>
where
S: Service<Request>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let current = self.active.load(Ordering::Relaxed);
if current < self.max_concurrent {
// Capacity available: check inner service:
self.inner.poll_ready(cx)
} else {
// At capacity: caller must wait for in-flight to complete
Poll::Pending
}
}
fn call(&mut self, req: Request) -> Self::Future {
self.active.fetch_add(1, Ordering::Relaxed);
// Note: actual implementation would decrement on completion
self.inner.call(req)
}
}Concurrency limits use poll_ready to enforce maximum in-flight requests.
Middleware Chaining
use tower::Service;
use std::task::{Context, Poll};
// Middleware wraps an inner service and checks readiness:
struct LoggingService<S> {
inner: S,
}
impl<S, Request> Service<Request> for LoggingService<S>
where
S: Service<Request>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Forward readiness to inner:
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Request) -> Self::Future {
println!("Processing request");
self.inner.call(req)
}
}
// Chaining readiness checks:
struct TimeoutService<S> {
inner: S,
}
impl<S, Request> Service<Request> for TimeoutService<S>
where
S: Service<Request>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Each layer propagates readiness:
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Request) -> Self::Future {
self.inner.call(req)
}
}Each middleware layer propagates poll_ready calls to inner services.
Load Shedding
use tower::Service;
use std::task::{Context, Poll};
use std::sync::atomic::{AtomicUsize, Ordering};
enum LoadShedError {
Overloaded,
}
struct LoadSheddingService<S> {
inner: S,
queue_depth: Arc<AtomicUsize>,
max_depth: usize,
}
impl<S, Request> Service<Request> for LoadSheddingService<S>
where
S: Service<Request>,
{
type Response = S::Response;
type Error = LoadShedError;
type Future = std::future::Ready<Result<S::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let depth = self.queue_depth.load(Ordering::Relaxed);
if depth >= self.max_depth {
// Shed load: return error instead of accepting
// This is different from Pending: we actively reject
Poll::Ready(Err(LoadShedError::Overloaded))
} else {
self.inner.poll_ready(cx).map_err(|_| LoadShedError::Overloaded)
}
}
fn call(&mut self, req: Request) -> Self::Future {
// Actually forward to inner service
std::future::ready(Err(LoadShedError::Overloaded))
}
}Load shedding returns errors from poll_ready to reject requests proactively.
Always-Ready Services
use tower::Service;
use std::task::{Context, Poll};
// Simple services that are always ready:
struct EchoService;
impl Service<String> for EchoService {
type Response = String;
type Error = std::convert::Infallible;
type Future = std::future::Ready<Result<String, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Always ready: no capacity constraints
Poll::Ready(Ok(()))
}
fn call(&mut self, req: String) -> Self::Future {
std::future::ready(Ok(req))
}
}
// Stateless services are often always ready:
struct IdentityService;
impl<T> Service<T> for IdentityService {
type Response = T;
type Error = std::convert::Infallible;
type Future = std::future::Ready<Result<T, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: T) -> Self::Future {
std::future::ready(Ok(req))
}
}Stateless services can always return Ready(Ok(())).
Pending and Wake-Ups
use tower::Service;
use std::task::{Context, Poll, Waker};
use std::cell::RefCell;
use std::rc::Rc;
struct BufferedService {
buffer: Rc<RefCell<Vec<String>>>,
max_size: usize,
waker: Option<Waker>,
}
impl Service<String> for BufferedService {
type Response = ();
type Error = &'static str;
type Future = std::future::Ready<Result<(), Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let buffer_len = self.buffer.borrow().len();
if buffer_len < self.max_size {
Poll::Ready(Ok(()))
} else {
// Store waker to be called when space available:
self.waker = Some(cx.waker().clone());
Poll::Pending
}
}
fn call(&mut self, req: String) -> Self::Future {
self.buffer.borrow_mut().push(req);
// If we were at max and now have space, wake:
if let Some(waker) = self.waker.take() {
waker.wake();
}
std::future::ready(Ok(()))
}
}Store the Waker and call it when the service becomes ready.
Connection Pooling
use tower::Service;
use std::task::{Context, Poll};
use std::collections::VecDeque;
struct PoolService<C> {
pool: VecDeque<C>,
waiters: VecDeque<tokio::sync::oneshot::Sender<C>>,
}
impl<C, Request> Service<Request> for PoolService<C>
where
C: Service<Request>,
{
type Response = C::Response;
type Error = C::Error;
type Future = C::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.pool.is_empty() {
// No connections available:
// Real implementation would register a waiter
Poll::Pending
} else {
// Check if pooled connection is ready:
self.pool.front_mut()
.expect("pool not empty")
.poll_ready(cx)
}
}
fn call(&mut self, req: Request) -> Self::Future {
let mut conn = self.pool.pop_front().expect("poll_ready ensures availability");
conn.call(req)
// Connection would be returned to pool after future completes
}
}Connection pools use poll_ready to wait for available connections.
Using Services with poll_ready
use tower::Service;
use std::future::Future;
use std::task::{Context, Poll};
use std::pin::Pin;
// Correct usage pattern:
async fn call_service<S, Request>(service: &mut S, request: Request) -> Result<S::Response, S::Error>
where
S: Service<Request>,
{
// Wait for service to be ready:
std::future::poll_fn(|cx| service.poll_ready(cx)).await?;
// Now safe to call:
service.call(request).await
}
// Manual polling (lower level):
fn manual_call<S, Request>(service: &mut S, request: Request, cx: &mut Context<'_>)
-> Poll<Result<S::Response, S::Error>>
where
S: Service<Request>,
{
// Check readiness:
match service.poll_ready(cx) {
Poll::Ready(Ok(())) => {
// Ready: start the call
Poll::Pending // The call future would handle the rest
}
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
}
}Always wait for poll_ready to return Ready(Ok(())) before calling.
Tower's ServiceFn Helper
use tower::service_fn;
// service_fn creates a service that's always ready:
async fn helper_example() {
let service = service_fn(|req: String| async move {
Ok::<String, std::convert::Infallible>(format!("Echo: {}", req))
});
// Always ready, can call immediately:
let response = service.oneshot("hello".to_string()).await.unwrap();
println!("{}", response);
}service_fn wraps a function into a service that's always ready.
Buffer Middleware
use tower::Service;
use tower::buffer::Buffer;
// Buffer wraps a service with capacity:
async fn buffer_example() {
// Creates a bounded channel for requests:
let service = Buffer::new(
SomeService::new(),
1024, // buffer capacity
);
// poll_ready returns Pending when buffer is full:
// This is handled automatically by Buffer middleware
}
struct SomeService;
impl Service<String> for SomeService {
type Response = String;
type Error = std::convert::Infallible;
type Future = std::future::Ready<Result<String, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: String) -> Self::Future {
std::future::ready(Ok(req))
}
}Buffer middleware uses a channel to provide bounded buffering.
Real-World: HTTP Server
use tower::Service;
use std::task::{Context, Poll};
// Hyper uses Service for each request:
struct HttpService {
rate_limiter: RateLimiter,
handler: Handler,
}
impl Service<hyper::Request<hyper::Body>> for HttpService {
type Response = hyper::Response<hyper::Body>;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Check rate limiter first:
if !self.rate_limiter.check() {
// Rate limited: pending until token available
cx.waker().wake_by_ref();
return Poll::Pending;
}
// Check handler readiness:
self.handler.poll_ready(cx)
}
fn call(&mut self, req: hyper::Request<hyper::Body>) -> Self::Future {
Box::pin(self.handler.handle(req))
}
}
struct RateLimiter {
tokens: usize,
}
impl RateLimiter {
fn check(&mut self) -> bool {
if self.tokens > 0 {
self.tokens -= 1;
true
} else {
false
}
}
}HTTP servers use poll_ready for rate limiting before accepting requests.
Real-World: gRPC Client
use tower::Service;
use std::task::{Context, Poll};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
// gRPC clients use poll_ready for connection management:
struct GrpcClient {
connection_pool: Arc<ConnectionPool>,
max_pending: usize,
pending: Arc<AtomicUsize>,
}
impl Service<Vec<u8>> for GrpcClient {
type Response = Vec<u8>;
type Error = GrpcError;
type Future = Pin<Box<dyn Future<Output = Result<Vec<u8>, GrpcError>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), GrpcError>> {
let pending = self.pending.load(Ordering::Relaxed);
if pending >= self.max_pending {
// Too many pending requests: backpressure
Poll::Pending
} else if self.connection_pool.has_connection() {
Poll::Ready(Ok(()))
} else {
// Waiting for connection:
Poll::Pending
}
}
fn call(&mut self, req: Vec<u8>) -> Self::Future {
self.pending.fetch_add(1, Ordering::Relaxed);
let pending = self.pending.clone();
Box::pin(async move {
let result = self.connection_pool.send(req).await;
pending.fetch_sub(1, Ordering::Relaxed);
result
})
}
}
struct ConnectionPool;
impl ConnectionPool {
fn has_connection(&self) -> bool { true }
async fn send(&self, _: Vec<u8>) -> Result<Vec<u8>, GrpcError> { Ok(vec![]) }
}
struct GrpcError;gRPC clients enforce concurrency limits through poll_ready.
Testing Services
use tower::Service;
use std::task::{Context, Poll};
// Testing readiness behavior:
#[test]
fn test_rate_limiting() {
let mut service = RateLimitedService::new(2); // 2 requests per second
// First request should be ready:
let cx = &mut Context::from_waker(todo!());
assert!(matches!(service.poll_ready(cx), Poll::Ready(Ok(()))));
// After two requests:
service.call(req1);
service.call(req2);
// Should be pending (rate limited):
assert!(matches!(service.poll_ready(cx), Poll::Pending));
}
struct RateLimitedService {
requests_per_second: u32,
tokens: u32,
}
impl RateLimitedService {
fn new(rps: u32) -> Self {
Self { requests_per_second: rps, tokens: rps }
}
}Test readiness state transitions to verify backpressure behavior.
Key Points
fn key_points() {
// 1. poll_ready returns Ready(Ok(())) when service can accept request
// 2. poll_ready returns Pending when at capacity or waiting
// 3. poll_ready returns Ready(Err) for permanent failures
// 4. Must poll_ready before each call()
// 5. Pending means caller should wait for wake-up
// 6. Enables backpressure propagation through middleware
// 7. Rate limiters use Pending for token exhaustion
// 8. Concurrency limits use Pending when at max
// 9. Load shedders return Ready(Err) to reject requests
// 10. Connection pools use Pending when empty
// 11. Stateless services are always ready
// 12. Waker must be stored and called when becoming ready
// 13. Buffer middleware provides bounded channel buffering
// 14. Each middleware layer forwards poll_ready to inner
// 15. Proactive rejection beats reactive error handling
}Key insight: poll_ready is the mechanism by which services communicate their capacity to accept work, enabling a cascade of backpressure from the innermost resource-constrained service through every middleware layer to the caller. The design is intentional: poll_ready separates capacity checking from request execution, allowing callers to decide whether to wait, shed load, or route elsewhere before committing to a request. When poll_ready returns Pending, the service stores the Waker and calls it when capacity becomes available—this ensures efficient waiting without polling. The Ready(Err(_)) variant signals permanent failure (connection closed, service shutting down), distinct from Pending which signals temporary unavailability. Middleware layers must propagate poll_ready calls to inner services, composing readiness checks: a rate limter wrapping a connection pool checks rate limits first, then checks pool availability. This composition means backpressure propagates automatically: if the innermost service returns Pending, every outer layer's poll_ready will also return Pending, pushing the wait all the way to the caller. The pattern poll_ready → call must be followed for each request—calling without readiness risks overwhelming the service or hitting asserts in debug builds.
