Loading pageβ¦
Rust walkthroughs
Loading pageβ¦
tower::Service's poll_ready method enable backpressure in middleware chains?tower::Service's poll_ready method enables backpressure by allowing services to signal their readiness to accept new requests before work is committed. When poll_ready returns Pending, the caller must wait before calling call, preventing the service from being overwhelmed. This creates a propagation chain: downstream services push back through poll_ready, middleware forwards this signal, and upstream callers eventually stop producing work. The mechanism ensures that load is naturally throttled at the source when downstream capacity is exhausted, without explicit coordination or dropped requests.
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait Service<Request> {
type Response;
type Error;
type Future: Future<Output = Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
fn call(&mut self, req: Request) -> Self::Future;
}poll_ready checks if the service can accept a request; call executes the request.
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
// Hypothetical service without backpressure
struct UnboundedService {
pending: Arc<AtomicUsize>,
max_concurrent: usize,
}
impl UnboundedService {
fn call(&self, req: String) -> impl std::future::Future<Output = String> {
// Always accept, even if overloaded
self.pending.fetch_add(1, Ordering::SeqCst);
async move {
// Process request
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
req
}
}
}
// Problem: Requests pile up, memory grows, latency increases
// No mechanism to tell caller to slow downWithout poll_ready, services accept all requests until resources are exhausted.
use tower::Service;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
struct BoundedService<Inner> {
inner: Inner,
pending: usize,
max_pending: usize,
}
impl<Inner, Request> Service<Request> for BoundedService<Inner>
where
Inner: Service<Request>,
{
type Response = Inner::Response;
type Error = Inner::Error;
type Future = Inner::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Check capacity before accepting
if self.pending >= self.max_pending {
// Signal: not ready, caller should wait
Poll::Pending
} else {
// Ready to accept
self.inner.poll_ready(cx)
}
}
fn call(&mut self, req: Request) -> Self::Future {
self.pending += 1;
self.inner.call(req)
}
}poll_ready returns Pending when capacity is exhausted, preventing new requests.
use tower::Service;
use std::task::{Context, Poll};
// Middleware that wraps an inner service
struct LoggingService<Inner> {
inner: Inner,
}
impl<Inner, Request> Service<Request> for LoggingService<Inner>
where
Inner: Service<Request>,
{
type Response = Inner::Response;
type Error = Inner::Error;
type Future = Inner::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Forward backpressure from inner service
println!("poll_ready: checking inner service");
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Request) -> Self::Future {
println!("call: forwarding request");
self.inner.call(req)
}
}
// If inner.poll_ready() returns Pending, this middleware propagates Pending
// Backpressure flows up through all middleware layersEach middleware forwards poll_ready from its inner service, propagating backpressure.
use tower::Service;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll};
use std::pin::Pin;
use std::future::Future;
struct RateLimited<Inner> {
inner: Inner,
concurrent: Arc<AtomicUsize>,
max_concurrent: usize,
}
impl<Inner, Request> Service<Request> for RateLimited<Inner>
where
Inner: Service<Request>,
{
type Response = Inner::Response;
type Error = Inner::Error;
type Future = Inner::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let current = self.concurrent.load(Ordering::Relaxed);
if current >= self.max_concurrent {
// At capacity: signal not ready
// Caller must wait and poll again later
cx.waker().wake_by_ref(); // Suggest immediate re-poll
Poll::Pending
} else {
// Has capacity: also check inner service
self.inner.poll_ready(cx)
}
}
fn call(&mut self, req: Request) -> Self::Future {
self.concurrent.fetch_add(1, Ordering::Relaxed);
self.inner.call(req)
}
}Rate limiting via poll_ready prevents overload without dropping requests.
use tower::Service;
use tower::buffer::Buffer;
use std::task::{Context, Poll};
// Buffer creates a bounded channel between caller and service
// It allows some requests to queue without blocking the caller
async fn use_buffer() -> Result<(), Box<dyn std::error::Error>> {
let service = SomeService::new();
// Buffer allows up to 10 pending requests
let mut buffered = Buffer::new(service, 10);
// poll_ready on Buffer:
// - Returns Ready if channel has capacity
// - Returns Pending if channel is full
// - Internally queues requests
// When inner service is slow:
// - Buffer fills up
// - poll_ready returns Pending
// - Caller experiences backpressure
Ok(())
}Buffer provides bounded queueing with backpressure when full.
use tower::Service;
async fn drive_service<S, Request>(mut service: S, requests: Vec<Request>)
where
S: Service<Request>,
{
for req in requests {
// Wait for service to be ready
// This is the backpressure mechanism in action
futures::future::poll_fn(|cx| service.poll_ready(cx))
.await
.expect("service failed");
// Now safe to call
let future = service.call(req);
let result = future.await;
println!("Request completed: {:?}", result);
}
}The pattern: poll_ready then call, ensuring service never exceeds capacity.
use tower::Service;
use tower::ServiceBuilder;
use tower::limit::concurrency::ConcurrencyLimit;
use tower::load_shed::LoadShed;
use tower::timeout::Timeout;
use std::time::Duration;
async fn build_service() {
let inner = DatabaseService::new();
// Stack of middleware, each propagating backpressure
let service = ServiceBuilder::new()
.layer(ConcurrencyLimitLayer::new(100)) // Limit concurrent requests
.layer(TimeoutLayer::new(Duration::from_secs(30))) // Timeout
.layer(LoadShedLayer::new()) // Shed load when overwhelmed
.service(inner);
// Backpressure flow:
// 1. LoadShed checks if overloaded, may reject immediately
// 2. Timeout sets up timeout mechanism
// 3. ConcurrencyLimit checks pending count
// 4. DatabaseService checks its capacity
// Each layer's poll_ready calls the next layer's poll_ready
}
struct DatabaseService;
impl DatabaseService {
fn new() -> Self { DatabaseService }
}
impl Service<String> for DatabaseService {
type Response = String;
type Error = std::io::Error;
type Future = std::future::Ready<Result<String, std::io::Error>>;
fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>)
-> std::task::Poll<Result<(), Self::Error>>
{
// Check database connection pool
std::task::Poll::Ready(Ok(()))
}
fn call(&mut self, req: String) -> Self::Future {
std::future::ready(Ok(format!("Processed: {}", req)))
}
}Each layer's poll_ready propagates signals from downstream to upstream.
use tower::Service;
use std::task::{Context, Poll};
use std::future::Future;
use std::pin::Pin;
// Load shedding: reject requests when overloaded
struct LoadShed<Inner> {
inner: Inner,
overloaded: bool,
}
impl<Inner, Request> Service<Request> for LoadShed<Inner>
where
Inner: Service<Request>,
{
type Response = Inner::Response;
type Error = Inner::Error;
type Future = Inner::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.overloaded {
// Signal error: service cannot accept requests
// This causes call() to fail, not block
Poll::Ready(Err(/* overload error */))
} else {
self.inner.poll_ready(cx)
}
}
fn call(&mut self, req: Request) -> Self::Future {
self.inner.call(req)
}
}
// Backpressure: caller waits when service is busy
// Load shedding: caller gets error when service is busyPoll::Ready(Err(...)) signals failure; Poll::Pending signals wait.
use tower::Service;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll};
use std::future::Future;
use std::pin::Pin;
use std::collections::VecDeque;
struct ConcurrencyLimit<Inner> {
inner: Inner,
current: Arc<AtomicUsize>,
limit: usize,
waiters: VecDeque<std::task::Waker>,
}
impl<Inner, Request> Service<Request> for ConcurrencyLimit<Inner>
where
Inner: Service<Request>,
{
type Response = Inner::Response;
type Error = Inner::Error;
type Future = Inner::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let current = self.current.load(Ordering::Acquire);
if current < self.limit {
// Capacity available: ready to accept
Poll::Ready(Ok(()))
} else {
// At limit: register for wake-up
self.waiters.push_back(cx.waker().clone());
Poll::Pending
}
}
fn call(&mut self, req: Request) -> Self::Future {
self.current.fetch_add(1, Ordering::AcqRel);
self.inner.call(req)
}
}
// When a request completes:
// 1. Decrement counter
// 2. Wake one waiting task
// 3. Waiting task's poll_ready succeedsConcurrency limits use poll_ready to queue callers when at capacity.
use tower::Service;
use std::future::Future;
async fn call_service<S, Request>(service: &mut S, request: Request)
-> Result<S::Response, S::Error>
where
S: Service<Request>,
{
// The canonical pattern:
loop {
// 1. Wait for readiness
futures::future::poll_fn(|cx| service.poll_ready(cx))
.await?;
// 2. Call the service
// poll_ready guarantees call won't panic or misbehave
let future = service.call(request);
// 3. Wait for response
// Note: some services allow concurrent calls after poll_ready
// Others require waiting for response before next poll_ready
return future.await;
}
}The poll_ready then call pattern is the foundation of backpressure.
use hyper::service::{Service, service_fn};
use hyper::{Body, Request, Response, Server};
use std::convert::Infallible;
async fn serve() {
// Hyper's server uses Service trait
// For each incoming connection:
// 1. Accept connection
// 2. Wait for request
// 3. Call poll_ready on service
// 4. If Pending, wait before calling
// 5. Call service with request
let make_service = service_fn(|_req: Request<Body>| async {
Ok::<_, Infallible>(Response::new(Body::from("Hello")))
});
// Hyper manages backpressure automatically
// If service is slow, poll_ready returns Pending
// Hyper stops accepting new connections
}Hyper uses poll_ready for automatic backpressure.
use tower::Service;
use tokio::sync::mpsc;
struct ChannelService {
sender: mpsc::Sender<String>,
pending: usize,
max_pending: usize,
}
impl Service<String> for ChannelService {
type Response = ();
type Error = mpsc::error::SendError<String>;
type Future = std::future::Ready<Result<(), Self::Error>>;
fn poll_ready(&mut self, cx: &mut std::task::Context<'_>)
-> std::task::Poll<Result<(), Self::Error>>
{
if self.pending >= self.max_pending {
// Channel full: apply backpressure
std::task::Poll::Pending
} else {
std::task::Poll::Ready(Ok(()))
}
}
fn call(&mut self, req: String) -> Self::Future {
self.pending += 1;
let sender = self.sender.clone();
std::future::ready(sender.try_send(req).map(|_| ()))
}
}Bounded channels naturally provide backpressure through capacity limits.
use tower::Service;
use std::task::{Context, Poll};
// Poll::Ready(Ok(())) -> Service ready, call will succeed
// Poll::Ready(Err(_)) -> Service failed, call returns error
// Poll::Pending -> Service busy, wait and retry
struct ServiceWithErrors<Inner> {
inner: Inner,
is_broken: bool,
}
impl<Inner, Request> Service<Request> for ServiceWithErrors<Inner>
where
Inner: Service<Request>,
{
type Response = Inner::Response;
type Error = Inner::Error;
type Future = Inner::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.is_broken {
// Permanent failure: return error
// Caller should not retry
Poll::Ready(Err(/* permanent error */))
} else {
// Forward to inner
self.inner.poll_ready(cx)
}
}
fn call(&mut self, req: Request) -> Self::Future {
self.inner.call(req)
}
}Different Poll variants signal different states.
use tower::Service;
use sqlx::PgPool;
use std::task::{Context, Poll};
struct DatabaseService {
pool: PgPool,
max_connections: u32,
}
impl Service<String> for DatabaseService {
type Response = String;
type Error = sqlx::Error;
type Future = std::future::Ready<Result<String, sqlx::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Check if pool has available connections
let status = self.pool.status();
if status.idle_connections == 0 && status.connections >= self.max_connections {
// Pool exhausted: backpressure
// Waiters will be woken when connections return
Poll::Pending
} else {
// Pool has capacity
Poll::Ready(Ok(()))
}
}
fn call(&mut self, query: String) -> Self::Future {
// Execute query
std::future::ready(Ok(format!("Result of: {}", query)))
}
}Database pools use poll_ready to apply backpressure when exhausted.
βββββββββββββββββββββββββββββββββββββββββββ
β Caller (client) β
β waits on poll_ready before call β
βββββββββββββββ¬ββββββββββββββββββββββββββββ
β poll_ready
βββββββββββββββΌββββββββββββββββββββββββββββ
β Timeout Layer β
β forwards poll_ready, adds timeout β
βββββββββββββββ¬ββββββββββββββββββββββββββββ
β poll_ready
βββββββββββββββΌββββββββββββββββββββββββββββ
β Concurrency Limit β
β returns Pending if at capacity β
βββββββββββββββ¬ββββββββββββββββββββββββββββ
β poll_ready
βββββββββββββββΌββββββββββββββββββββββββββββ
β Buffer Layer β
β queues requests, backs up when full β
βββββββββββββββ¬ββββββββββββββββββββββββββββ
β poll_ready
βββββββββββββββΌββββββββββββββββββββββββββββ
β Inner Service β
β core logic, may return Pending β
βββββββββββββββββββββββββββββββββββββββββββ
Backpressure propagates through the middleware stack.
poll_ready enables backpressure by inverting control: instead of services accepting all requests and potentially failing, services advertise readiness and callers wait:
The mechanism: poll_ready returns Ready(Ok(())) when the service can accept a request, Ready(Err(...)) when the service has failed, or Pending when busy. Callers must wait on Pending before calling call. This creates a natural flow control: busy services return Pending, callers wait, and load stops at the source.
Middleware propagation: Each middleware layer's poll_ready calls its inner service's poll_ready. When the innermost service returns Pending, this propagates through all layers. The entire middleware chain becomes backpressured together. This is compositionally powerful: add a concurrency limit, it naturally applies backpressure to everything above it.
Key insight: Backpressure through poll_ready is cooperative, not preemptive. It relies on callers checking readiness before making requests. In frameworks like Tower and Hyper, this pattern is enforced through service call abstractions. The alternativeβload sheddingβrejects requests explicitly with errors, which pushes failure handling to callers but avoids waiting. Backpressure with poll_ready queues requests in the caller's task, which uses memory but avoids dropped work. Choose based on whether you want callers to wait (backpressure) or handle failures (load shedding).