Loading pageā¦
Rust walkthroughs
Loading pageā¦
tower::util::ServiceExt::ready for checking service availability before sending requests?tower::util::ServiceExt::ready returns a future that resolves when the service is ready to process a request, separating the "check readiness" phase from the "execute request" phaseāthis is essential because Tower services may be in a non-ready state due to rate limiting, circuit breaking, connection backpressure, or other middleware constraints, and calling call on a non-ready service panics or returns an error depending on the implementation. The ready method allows callers to asynchronously wait for service readiness before attempting to send a request, enabling proper backpressure propagation and preventing request failures. This design embodies Tower's core philosophy: services should signal readiness explicitly rather than failing unpredictably when called in a non-ready state.
use tower::Service;
// Simplified Service trait
trait Service<Request> {
type Response;
type Error;
type Future: Future<Output = Result<Self::Response, Self::Error>>;
// Returns Poll::Ready(Ok(())) if service can process requests
// Returns Poll::Pending if service is not ready
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
// Actually process the request - MUST only be called when ready
fn call(&mut self, req: Request) -> Self::Future;
}poll_ready checks if the service can accept requests; call executes the request.
use tower::Service;
use std::task::{Poll, Context};
fn why_readiness_matters<S, Request>(mut service: S, request: Request)
where
S: Service<Request>,
{
// WRONG: Calling without checking readiness
// This may panic or return an error
let future = service.call(request);
// Services may be not ready due to:
// - Rate limiting (too many requests)
// - Circuit breaker (circuit is open)
// - Connection pool exhaustion
// - Buffer full (load shedding)
// - Timeout cooldown
}Calling call when the service is not ready violates the Service contract.
use tower::util::ServiceExt;
use tower::Service;
async fn proper_service_call<S, Request>(mut service: S, request: Request)
where
S: Service<Request>,
{
// CORRECT: Wait for readiness first
service.ready().await.unwrap();
// Now safe to call
let response = service.call(request).await.unwrap();
}ready() returns a future that resolves when poll_ready returns Ready.
use tower::Service;
use std::task::{Poll, Context};
// Simplified view of ready()
async fn ready<S, Request>(mut service: S)
where
S: Service<Request>,
{
// This is essentially what ready() does:
std::future::poll_fn(move |cx| {
service.poll_ready(cx)
}).await;
// Alternative implementation:
// loop {
// match service.poll_ready(&mut cx) {
// Poll::Ready(Ok(())) => return,
// Poll::Ready(Err(e)) => panic!("service failed"),
// Poll::Pending => cx.waker().wake_by_ref(),
// }
// }
}ready() wraps poll_ready in an async-friendly interface.
use tower::Service;
use std::task::{Poll, Context};
// Example: A rate-limited service
struct RateLimitedService<S> {
inner: S,
requests_this_second: usize,
max_requests: usize,
}
impl<S, Request> Service<Request> for RateLimitedService<S>
where
S: Service<Request>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.requests_this_second < self.max_requests {
// Ready: can accept requests
Poll::Ready(Ok(()))
} else {
// Not ready: rate limit exceeded
// Need to wait for rate limit window to reset
Poll::Pending
}
}
fn call(&mut self, req: Request) -> Self::Future {
self.requests_this_second += 1;
self.inner.call(req)
}
}A service is "not ready" when it cannot currently process requests.
use tower::Service;
use std::task::{Poll, Context};
// Circuit breaker middleware
struct CircuitBreaker<S> {
inner: S,
state: CircuitState,
failure_count: usize,
threshold: usize,
}
enum CircuitState {
Closed, // Normal operation
Open, // Failing, reject requests
HalfOpen, // Testing if recovered
}
impl<S, Request> Service<Request> for CircuitBreaker<S>
where
S: Service<Request>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.state {
CircuitState::Closed => {
// Delegate to inner service
self.inner.poll_ready(cx)
}
CircuitState::Open => {
// Not ready: circuit is open
// Reject new requests until timeout
Poll::Pending
}
CircuitState::HalfOpen => {
// Ready: allow one test request
Poll::Ready(Ok(()))
}
}
}
fn call(&mut self, req: Request) -> Self::Future {
self.inner.call(req)
}
}Circuit breaker uses readiness to prevent calls when the circuit is open.
use tower::Service;
use tower::util::ServiceExt;
async fn two_phase_protocol<S, Request>(mut service: S, request: Request)
where
S: Service<Request> + Clone,
S::Error: std::fmt::Debug,
{
// Phase 1: Wait for readiness
// This is where backpressure is applied
let ready_service = service.ready().await.unwrap();
// Phase 2: Execute the request
// This MUST be done on a ready service
let response = ready_service.call(request).await.unwrap();
}The two phases separate waiting (backpressure) from execution (request processing).
use tower::Service;
use tower::util::ServiceExt;
async fn compose_with_ready<S, Request>(service: S, request: Request)
where
S: Service<Request> + Clone,
{
// ready_and_then pattern
service
.ready()
.and_then(|ready_service| ready_service.call(request))
.await
.unwrap();
}ready() returns a Ready wrapper that can be composed with other futures.
use tower::Service;
use tower::util::ServiceExt;
async fn ready_and_call<S, Request>(mut service: S, request: Request)
where
S: Service<Request>,
{
// Many services need to be "reserved" before calling
// ready_and() combines ready() and call()
let response = service.ready_and().call(request).await;
}ready_and() is a convenience that combines readiness check with call.
use tower::Service;
use tower::util::ServiceExt;
use std::collections::VecDeque;
// A bounded buffer service
struct Buffer<S, Request> {
inner: S,
queue: VecDeque<Request>,
max_size: usize,
}
impl<S, Request> Service<Request> for Buffer<S, Request>
where
S: Service<Request>,
{
type Response = S::Response;
type Error = S::Error;
type Future = /* ... */;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.queue.len() < self.max_size {
Poll::Ready(Ok(())) // Buffer has room
} else {
// Buffer full - apply backpressure
// Callers waiting on ready() will block
Poll::Pending
}
}
fn call(&mut self, req: Request) -> Self::Future {
self.queue.push_back(req);
// Process queue...
}
}
// Backpressure propagation:
// Client -> ready().await -> Buffer (full?) -> Inner service
// If buffer is full, client waits at ready()ready() propagates backpressure from inner services to callers.
use tower::Service;
use std::task::{Poll, Context};
use std::pin::Pin;
use std::future::Future;
// Using ready() (recommended for async code)
async fn with_ready<S, Request>(mut service: S, request: Request)
where
S: Service<Request>,
{
service.ready().await.unwrap();
service.call(request).await.unwrap();
}
// Using poll_ready() directly (for implementing Services)
fn with_poll_ready<S, Request>(mut service: S, request: Request, cx: &mut Context<'_>)
where
S: Service<Request> + Unpin,
{
loop {
match service.poll_ready(cx) {
Poll::Ready(Ok(())) => {
// Ready to call
let future = service.call(request);
// Need to poll the future...
break;
}
Poll::Ready(Err(e)) => {
// Service failed
panic!("Service error: {:?}", e);
}
Poll::Pending => {
// Not ready yet, will be woken up
// Continue polling...
}
}
}
}Use ready() in async code; poll_ready() when implementing Future or Service.
use tower::Service;
use tower::util::ServiceExt;
async fn handle_unavailable<S, Request>(mut service: S, request: Request)
where
S: Service<Request>,
S::Error: Into<anyhow::Error>,
{
match service.ready().await {
Ok(ready_service) => {
// Service is ready, proceed with call
match ready_service.call(request).await {
Ok(response) => response,
Err(e) => {
// Handle call error
Err(e.into())
}
}
}
Err(e) => {
// Service failed to become ready
// This indicates a more serious problem
// e.g., inner service crashed, circuit breaker permanently open
Err(e.into())
}
}
}ready() can fail if the service cannot become ready (e.g., permanent failure).
use tower::Service;
use tower::util::ServiceExt;
use tokio::time::{timeout, Duration};
async fn ready_with_timeout<S, Request>(mut service: S, request: Request)
where
S: Service<Request>,
{
// Wait for readiness with timeout
match timeout(Duration::from_secs(5), service.ready()).await {
Ok(Ok(ready_service)) => {
// Ready within timeout
ready_service.call(request).await
}
Ok(Err(e)) => {
// Service error during readiness check
Err(e)
}
Err(_) => {
// Timeout waiting for readiness
// Service may be overloaded or stuck
Err(anyhow::anyhow!("Service readiness timeout"))
}
}
}Combine ready() with timeout to prevent indefinite waiting.
use tower::Service;
use tower::util::ServiceExt;
use tower::limit::concurrency::ConcurrencyLimit;
use tower::timeout::Timeout;
use tower::retry::Retry;
use tower::load_balance::LoadBalance;
async fn composed_readiness<Backend, Request>(
service: ConcurrencyLimit<Retry<Timeout<LoadBalance<Backend>>>>,
request: Request,
) where
Backend: Service<Request> + Clone,
{
// Multiple layers may affect readiness:
// 1. ConcurrencyLimit: Not ready if concurrent limit reached
// 2. Retry: Usually ready (delegates to inner)
// 3. Timeout: Usually ready (delegates to inner)
// 4. LoadBalance: Ready if any backend is ready
// ready() waits for ALL layers to be ready
let ready_service = service.ready().await.unwrap();
// Now execute through all layers
ready_service.call(request).await
}ready() propagates through the entire service stack.
use tower::Service;
use std::task::{Poll, Context};
// A simple echo service is always ready
struct EchoService;
impl Service<String> for EchoService {
type Response = String;
type Error = std::convert::Infallible;
type Future = std::future::Ready<Result<String, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Always ready - no state to manage
Poll::Ready(Ok(()))
}
fn call(&mut self, req: String) -> Self::Future {
std::future::ready(Ok(req))
}
}
// For such services, ready() resolves immediately
// But calling ready() is still good practice for consistencySimple services may always be ready, but ready() is still correct.
use tower::Service;
use std::task::{Poll, Context};
// A service that must connect first
struct ConnectingService {
connection_state: ConnectionState,
}
enum ConnectionState {
Disconnected,
Connecting,
Connected,
}
impl Service<String> for ConnectingService {
type Response = String;
type Error = anyhow::Error;
type Future = /* ... */;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.connection_state {
ConnectionState::Connected => Poll::Ready(Ok(())),
ConnectionState::Disconnected => {
// Start connection
self.connection_state = ConnectionState::Connecting;
// Schedule async connection...
Poll::Pending
}
ConnectionState::Connecting => {
// Check if connection complete
// If connected: Poll::Ready(Ok(()))
// If still connecting: Poll::Pending
Poll::Pending
}
}
}
fn call(&mut self, req: String) -> Self::Future {
// Assume connected since ready() was called
// ...
}
}Services can use readiness to manage connection state.
use tower::Service;
use tower::util::ServiceExt;
// WITHOUT ready() - incorrect usage
async fn without_ready<S, Request>(mut service: S, request: Request)
where
S: Service<Request>,
{
// May panic if service not ready!
match service.call(request).await {
Ok(response) => response,
Err(e) => {
// Error might be from calling unready service
// Or from the service itself
// Hard to distinguish
Err(e)
}
}
}
// WITH ready() - correct usage
async fn with_ready<S, Request>(mut service: S, request: Request)
where
S: Service<Request>,
{
// Wait for readiness first
service.ready().await.map_err(|e| {
// Clear error: service not ready
e
})?;
// Now call is safe
service.call(request).await.map_err(|e| {
// Error is from the service itself
e
})
}ready() separates readiness errors from service errors.
use tower::Service;
use tower::util::ServiceExt;
use tower::load_balance::LoadBalance;
async fn load_balanced_call<S, Request>(
mut load_balance: LoadBalance<S>,
request: Request,
) where
S: Service<Request> + Clone,
{
// ready() finds an available backend
// LoadBalance implements poll_ready to select a backend
load_balance.ready().await.unwrap();
// call() uses the selected backend
load_balance.call(request).await
}Load balancers use readiness to select from available backends.
use tower::Service;
use std::task::{Poll, Context};
// The Service trait has an implicit contract:
//
// 1. poll_ready() returns Ready(Ok(())) when service can process requests
// 2. poll_ready() returns Pending when service cannot currently accept requests
// 3. poll_ready() returns Ready(Err(e)) when service has failed
// 4. call() should ONLY be called after poll_ready() returns Ready(Ok(()))
// 5. After call(), must call poll_ready() again before next call()
//
// Violating this contract leads to:
// - Panics (some implementations)
// - Undefined behavior
// - Incorrect request processing
fn service_contract_example<S, Request>(mut service: S, request: Request)
where
S: Service<Request>,
{
// Correct sequence:
// 1. Call poll_ready until Ready(Ok)
// 2. Call call once
// 3. Repeat from step 1 for next request
// ready() does step 1
// call() does step 2
// The pattern repeats for each request
}The Service contract requires checking readiness before each call.
What ready() does:
// ready() returns a future that:
// 1. Polls poll_ready() until it returns Ready
// 2. Returns the service ready for call()
// 3. Allows async/await syntax for readiness check
// Equivalent manual implementation:
async fn ready_manual<S, Request>(mut service: S)
where
S: Service<Request>,
{
std::future::poll_fn(move |cx| {
service.poll_ready(cx)
}).await
}Why readiness matters:
// Services may be not ready due to:
// - Rate limiting (too many requests)
// - Concurrency limits (too many in-flight)
// - Circuit breakers (circuit open)
// - Connection pool exhaustion
// - Buffer full (load shedding)
// - Service discovery (no backends available)
// - Initialization (connecting, warming up)Correct usage pattern:
// For each request:
// 1. await service.ready()
// 2. await service.call(request)
// 3. Handle response/error
// The ready() call is where backpressure is applied
// Callers wait at ready() when service is overloadedKey insight: tower::util::ServiceExt::ready exists because Tower services model readiness as a first-class conceptāservices may temporarily be unable to accept requests due to rate limits, circuit breakers, connection limits, or other constraints, and calling call on a non-ready service violates the Service contract. ready() provides an async interface to poll_ready, allowing callers to wait for service readiness before making a call, which propagates backpressure up the call stack and prevents request failures. This two-phase protocol (ready then call) is fundamental to Tower's design: it separates the question "can I handle this request?" from "please handle this request," enabling middleware like rate limiters and circuit breakers to reject requests gracefully at the readiness phase rather than failing during execution.