Loading pageā¦
Rust walkthroughs
Loading pageā¦
tower::retry::Retry middleware decide when to retry a failed request?tower::retry::Retry uses a policy trait that examines each response to determine if a retry should occur, delegating the decision logic entirely to the policy implementation rather than hardcoding retry conditions. The policy receives both the original request and the response (or error), returning either Retry::Yes with a modified request for the next attempt, or Retry::No with the response to return to the caller. This design allows arbitrary retry conditionsāerror types, status codes, response headers, or even response body contentāto drive decisions. The policy also controls retry limits through the retry method's return value, enabling sophisticated backoff strategies through nested policies or custom implementations.
use tower::retry::Policy;
use std::future::Future;
// Simplified trait definition
pub trait Policy<Req, Res, E> {
type Future: Future<Output = Self>;
fn retry(&self, req: &Req, result: Result<&Res, &E>) -> Option<Self::Future>;
fn clone_request(&self, req: &Req) -> Option<Req>;
}The policy examines the request and result, returning a future that resolves to the next policy state.
use tower::retry::Retry;
use tower::ServiceBuilder;
use tower::retry::backoff::ExponentialBackoff;
use std::time::Duration;
#[tokio::main]
async fn main() {
let retry_policy = ExponentialBackoff::new(
Duration::from_millis(10),
Duration::from_secs(1),
3, // max retries
);
// This policy retries on any error with exponential backoff
}Tower provides policies like ExponentialBackoff for common retry patterns.
use tower::retry::Policy;
use std::future::Future;
use std::pin::Pin;
use std::time::{Duration, Instant};
#[derive(Clone)]
struct RetryOnError<E> {
max_retries: usize,
attempts: usize,
retryable_errors: Vec<E>,
}
impl<E: std::fmt::Debug + Clone + PartialEq> Policy<(), (), E> for RetryOnError<E> {
type Future = Pin<Box<dyn Future<Output = Self> + Send>>;
fn retry(&self, _req: &(), result: Result<&(), &E>) -> Option<Self::Future> {
match result {
Ok(_) => None, // Success, don't retry
Err(error) => {
// Only retry if error is in our retryable list
if self.retryable_errors.contains(error) && self.attempts < self.max_retries {
let next_policy = RetryOnError {
max_retries: self.max_retries,
attempts: self.attempts + 1,
retryable_errors: self.retryable_errors.clone(),
};
Some(Box::pin(async move { next_policy }))
} else {
None // Non-retryable error or max retries exceeded
}
}
}
}
fn clone_request(&self, req: &()) -> Option<()> {
Some(req.clone())
}
}The policy checks the error type to decide if retrying makes sense.
use tower::retry::Policy;
use http::{Request, Response, StatusCode};
use std::future::Future;
use std::pin::Pin;
#[derive(Clone)]
struct RetryOnStatus {
max_retries: usize,
attempts: usize,
retryable_statuses: Vec<StatusCode>,
}
impl<Body> Policy<Request<Body>, Response<Body>, Box<dyn std::error::Error + Send + Sync>>
for RetryOnStatus
where
Body: Clone,
{
type Future = Pin<Box<dyn Future<Output = Self> + Send>>;
fn retry(
&self,
_req: &Request<Body>,
result: Result<&Response<Body>, &Box<dyn std::error::Error + Send + Sync>>
) -> Option<Self::Future> {
match result {
Ok(response) => {
// Retry on specific status codes
let status = response.status();
if self.retryable_statuses.contains(&status) && self.attempts < self.max_retries {
let next = RetryOnStatus {
max_retries: self.max_retries,
attempts: self.attempts + 1,
retryable_statuses: self.retryable_statuses.clone(),
};
Some(Box::pin(async move { next }))
} else {
None
}
}
Err(_) => {
// Retry on errors (connection failures, etc.)
if self.attempts < self.max_retries {
let next = RetryOnStatus {
max_retries: self.max_retries,
attempts: self.attempts + 1,
retryable_statuses: self.retryable_statuses.clone(),
};
Some(Box::pin(async move { next }))
} else {
None
}
}
}
}
fn clone_request(&self, req: &Request<Body>) -> Option<Request<Body>> {
// Can only clone if body is clonable
Some(req.clone())
}
}HTTP clients often retry on 5xx server errors or 429 rate limits.
use tower::retry::Policy;
use std::future::Future;
use std::pin::Pin;
#[derive(Clone, Debug)]
struct CountingPolicy {
attempts: usize,
max_attempts: usize,
}
impl<Req: Clone, Res, E> Policy<Req, Res, E> for CountingPolicy {
type Future = Pin<Box<dyn Future<Output = Self> + Send>>;
fn retry(&self, _req: &Req, _result: Result<&Res, &E>) -> Option<Self::Future> {
// Policy is stateful - it tracks attempts
println!("Retry policy called. Attempts so far: {}", self.attempts);
if self.attempts >= self.max_attempts {
println!("Max attempts reached, not retrying");
None
} else {
// Return future that resolves to updated policy
let next = CountingPolicy {
attempts: self.attempts + 1,
max_attempts: self.max_attempts,
};
Some(Box::pin(async move {
println!("Preparing for attempt {}", next.attempts + 1);
next
}))
}
}
fn clone_request(&self, req: &Req) -> Option<Req> {
Some(req.clone())
}
}The policy returns a new instance of itself with updated state for each attempt.
use tower::retry::Policy;
use std::future::Future;
use std::pin::Pin;
use std::time::{Duration, Instant};
#[derive(Clone)]
struct BackoffPolicy {
max_retries: usize,
attempts: usize,
base_delay: Duration,
max_delay: Duration,
}
impl BackoffPolicy {
fn current_delay(&self) -> Duration {
let delay = self.base_delay * 2u32.pow(self.attempts as u32);
delay.min(self.max_delay)
}
}
impl<Req: Clone, Res, E> Policy<Req, Res, E> for BackoffPolicy {
type Future = Pin<Box<dyn Future<Output = Self> + Send>>;
fn retry(&self, _req: &Req, _result: Result<&Res, &E>) -> Option<Self::Future> {
if self.attempts >= self.max_retries {
return None;
}
let delay = self.current_delay();
let next_attempts = self.attempts + 1;
let max_retries = self.max_retries;
let base_delay = self.base_delay;
let max_delay = self.max_delay;
Some(Box::pin(async move {
tokio::time::sleep(delay).await;
BackoffPolicy {
max_retries,
attempts: next_attempts,
base_delay,
max_delay,
}
}))
}
fn clone_request(&self, req: &Req) -> Option<Req> {
Some(req.clone())
}
}The policy future can include delays, implementing backoff strategies.
use tower::{Service, ServiceBuilder};
use tower::retry::Retry;
use http::{Request, Response};
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
#[derive(Clone)]
struct FlakyService {
call_count: Arc<AtomicUsize>,
}
impl Service<Request<String>> for FlakyService {
type Response = Response<String>;
type Error = String;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
std::task::Poll::Ready(Ok(()))
}
fn call(&mut self, _req: Request<String>) -> Self::Future {
let count = self.call_count.fetch_add(1, Ordering::SeqCst);
Box::pin(async move {
if count < 2 {
// Fail first 2 calls
Err(format!("Connection failed (attempt {})", count + 1))
} else {
// Succeed on 3rd call
Ok(Response::builder()
.status(200)
.body("Success!".to_string())
.unwrap())
}
})
}
}
#[tokio::main]
async fn main() {
let policy = CountingPolicy {
attempts: 0,
max_attempts: 5,
};
let service = FlakyService {
call_count: Arc::new(AtomicUsize::new(0)),
};
let mut retry_service = Retry::new(policy, service);
let request = Request::builder()
.body("test".to_string())
.unwrap();
match retry_service.call(request).await {
Ok(response) => println!("Got response: {:?}", response.status()),
Err(e) => println!("Failed after retries: {}", e),
}
}Retry wraps a service and applies the policy on each failure.
use tower::retry::Policy;
// The policy is called in these situations:
// 1. After successful response - decide if retry needed (e.g., 429)
// 2. After error response - decide if error is retryable
// 3. After cloned request failure - policy controls continuation
// The policy examines:
// - The original request (for reference, can modify for retry)
// - The result: Ok(&Response) or Err(&Error)
// - Internal state (attempt count, delays, etc.)The policy has full context to make retry decisions.
use tower::retry::Policy;
use http::{Request, Response, StatusCode};
use std::future::Future;
use std::pin::Pin;
#[derive(Clone)]
struct SmartRetryPolicy {
attempts: usize,
max_attempts: usize,
}
impl SmartRetryPolicy {
fn is_retryable_error(&self, error: &Box<dyn std::error::Error + Send + Sync>) -> bool {
// Check for transient errors
error.to_string().contains("connection reset")
|| error.to_string().contains("timeout")
}
fn is_retryable_status(&self, status: StatusCode) -> bool {
matches!(status,
StatusCode::REQUEST_TIMEOUT |
StatusCode::TOO_MANY_REQUESTS |
StatusCode::INTERNAL_SERVER_ERROR |
StatusCode::BAD_GATEWAY |
StatusCode::SERVICE_UNAVAILABLE |
StatusCode::GATEWAY_TIMEOUT
)
}
}
impl<Body: Clone> Policy<Request<Body>, Response<Body>, Box<dyn std::error::Error + Send + Sync>>
for SmartRetryPolicy
{
type Future = Pin<Box<dyn Future<Output = Self> + Send>>;
fn retry(
&self,
_req: &Request<Body>,
result: Result<&Response<Body>, &Box<dyn std::error::Error + Send + Sync>>
) -> Option<Self::Future> {
if self.attempts >= self.max_attempts {
return None;
}
let should_retry = match result {
Ok(response) => self.is_retryable_status(response.status()),
Err(error) => self.is_retryable_error(error),
};
if should_retry {
let next_attempts = self.attempts + 1;
Some(Box::pin(async move {
SmartRetryPolicy {
attempts: next_attempts,
max_attempts: self.max_attempts,
}
}))
} else {
None
}
}
fn clone_request(&self, req: &Request<Body>) -> Option<Request<Body>> {
Some(req.clone())
}
}Combine multiple conditions in a single policy.
use tower::retry::Policy;
// The clone_request method is critical:
// - Retries need to send the request again
// - The original request may have been consumed
// - Policy must produce a fresh copy
impl<Req: Clone, Res, E> Policy<Req, Res, E> for SomePolicy {
fn clone_request(&self, req: &Req) -> Option<Req> {
// If None is returned, retry is not possible
// This happens when the request body is not clonable
Some(req.clone())
}
}
// For streaming bodies, cloning may not be possible
// In that case, the policy returns None, preventing retriesRequests must be clonable for retries to work.
use tower::retry::Policy;
use std::future::Future;
use std::pin::Pin;
use std::time::Duration;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
#[derive(Clone)]
struct JitteredBackoffPolicy {
attempts: usize,
max_attempts: usize,
base_delay: Duration,
jitter_factor: f64,
rng_state: Arc<AtomicU64>,
}
impl JitteredBackoffPolicy {
fn jittered_delay(&self) -> Duration {
let base = self.base_delay.as_millis() as f64 * 2f64.powi(self.attempts as i32);
// Add random jitter
let rng = self.rng_state.fetch_add(1, Ordering::Relaxed);
let jitter = (rng as f64 / u64::MAX as f64) * self.jitter_factor;
let delay_ms = base * (1.0 + jitter);
Duration::from_millis(delay_ms as u64)
}
}
impl<Req: Clone, Res, E> Policy<Req, Res, E> for JitteredBackoffPolicy {
type Future = Pin<Box<dyn Future<Output = Self> + Send>>;
fn retry(&self, _req: &Req, _result: Result<&Res, &E>) -> Option<Self::Future> {
if self.attempts >= self.max_attempts {
return None;
}
let delay = self.jittered_delay();
let next_attempts = self.attempts + 1;
let base_delay = self.base_delay;
let jitter_factor = self.jitter_factor;
let rng_state = self.rng_state.clone();
let max_attempts = self.max_attempts;
Some(Box::pin(async move {
tokio::time::sleep(delay).await;
JitteredBackoffPolicy {
attempts: next_attempts,
max_attempts,
base_delay,
jitter_factor,
rng_state,
}
}))
}
fn clone_request(&self, req: &Req) -> Option<Req> {
Some(req.clone())
}
}Jitter prevents thundering herd problems when many clients retry simultaneously.
use tower::retry::Policy;
use http::{Request, Response};
use std::future::Future;
use std::pin::Pin;
#[derive(Clone)]
struct BodyInspectionPolicy {
attempts: usize,
max_attempts: usize,
}
impl BodyInspectionPolicy {
async fn is_retryable_body(body: &str) -> bool {
// Some APIs return 200 with error in body
body.contains("\"retryable\":true") || body.contains("\"error\":\"transient\"")
}
}
// Note: Reading the body consumes it, so you need special handling
// This typically requires buffering the body or using a policy
// that can work with already-examined responses
// A more practical approach uses response headers for decisions
impl<Body: Clone + AsRef<[u8]>> Policy<Request<Body>, Response<Vec<u8>>, Box<dyn std::error::Error + Send + Sync>>
for BodyInspectionPolicy
{
type Future = Pin<Box<dyn Future<Output = Self> + Send>>;
fn retry(
&self,
_req: &Request<Body>,
result: Result<&Response<Vec<u8>>, &Box<dyn std::error::Error + Send + Sync>>
) -> Option<Self::Future> {
if self.attempts >= self.max_attempts {
return None;
}
match result {
Ok(response) => {
// Check body content for retry indicators
let body = response.body();
if body.contains(&b"retryable"[..]) {
let next_attempts = self.attempts + 1;
return Some(Box::pin(async move {
BodyInspectionPolicy {
attempts: next_attempts,
max_attempts: self.max_attempts,
}
}));
}
None
}
Err(_) => {
let next_attempts = self.attempts + 1;
Some(Box::pin(async move {
BodyInspectionPolicy {
attempts: next_attempts,
max_attempts: self.max_attempts,
}
}))
}
}
}
fn clone_request(&self, req: &Request<Body>) -> Option<Request<Body>> {
Some(req.clone())
}
}Body inspection requires careful handling since reading consumes the body.
use tower::ServiceBuilder;
use tower::retry::{Retry, backoff::ExponentialBackoff};
use tower::limit::concurrency::ConcurrencyLimit;
use tower::timeout::Timeout;
use std::time::Duration;
#[tokio::main]
async fn main() {
let retry_layer = Retry::new(
ExponentialBackoff::new(
Duration::from_millis(100), // min
Duration::from_secs(10), // max
3, // max retries
),
);
// Compose retry with other middleware
let service = ServiceBuilder::new()
.timeout(Duration::from_secs(5))
.concurrency_limit(10)
.layer(retry_layer)
.service(/* underlying service */);
}Retry integrates with Tower's middleware stack.
use tower::retry::Policy;
use http::{Method, Request, Response};
use std::future::Future;
use std::pin::Pin;
#[derive(Clone)]
struct IdempotentRetryPolicy {
attempts: usize,
max_attempts: usize,
}
impl IdempotentRetryPolicy {
fn is_idempotent(&self, method: &Method) -> bool {
// Only retry idempotent methods by default
matches!(method,
Method::GET |
Method::HEAD |
Method::OPTIONS |
Method::PUT |
Method::DELETE
)
}
}
impl<Body: Clone> Policy<Request<Body>, Response<Body>, Box<dyn std::error::Error + Send + Sync>>
for IdempotentRetryPolicy
{
type Future = Pin<Box<dyn Future<Output = Self> + Send>>;
fn retry(
&self,
req: &Request<Body>,
_result: Result<&Response<Body>, &Box<dyn std::error::Error + Send + Sync>>
) -> Option<Self::Future> {
// Only retry idempotent methods
if !self.is_idempotent(req.method()) {
return None;
}
if self.attempts >= self.max_attempts {
return None;
}
let next_attempts = self.attempts + 1;
Some(Box::pin(async move {
IdempotentRetryPolicy {
attempts: next_attempts,
max_attempts: self.max_attempts,
}
}))
}
fn clone_request(&self, req: &Request<Body>) -> Option<Request<Body>> {
Some(req.clone())
}
}Safe retrying considers whether the request can be safely repeated.
use tower::retry::Policy;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
#[derive(Clone)]
struct CircuitBreakerPolicy {
attempts: usize,
max_attempts: usize,
circuit_open: Arc<AtomicBool>,
}
impl<Req: Clone, Res, E> Policy<Req, Res, E> for CircuitBreakerPolicy {
type Future = Pin<Box<dyn Future<Output = Self> + Send>>;
fn retry(&self, _req: &Req, _result: Result<&Res, &E>) -> Option<Self::Future> {
// Check circuit breaker before retrying
if self.circuit_open.load(Ordering::Relaxed) {
println!("Circuit breaker open, not retrying");
return None;
}
if self.attempts >= self.max_attempts {
return None;
}
let next_attempts = self.attempts + 1;
let circuit_open = self.circuit_open.clone();
Some(Box::pin(async move {
CircuitBreakerPolicy {
attempts: next_attempts,
max_attempts: self.max_attempts,
circuit_open,
}
}))
}
fn clone_request(&self, req: &Req) -> Option<Req> {
Some(req.clone())
}
}Policies can incorporate circuit breaker patterns.
| Decision Factor | How Policy Uses It |
|----------------|---------------------|
| Error type | Check Err(&E) variant, retry on transient errors |
| Status code | Check Response.status(), retry on 5xx, 429 |
| Response body | Examine body content for retry signals |
| Attempt count | Compare self.attempts against max_attempts |
| Request method | Only retry idempotent methods |
| Custom headers | Check Retry-After header for timing |
| Circuit breaker | External state can block retries |
tower::retry::Retry delegates retry decisions to the Policy trait, making it infinitely customizable:
The policy is a state machine: Each call to retry returns either None (stop retrying) or Some(future) (continue retrying with updated state). The future resolves to the next policy instance, which tracks attempt count and can include delays. This pattern lets the policy evolveācounting attempts, tracking backoff timing, and maintaining any custom state.
The decision is fully in policy control: The policy sees both the request and result (success or error), and can base decisions on any factor. Common patterns include retrying transient errors (timeouts, connection resets), retryable status codes (503, 429), and request properties (idempotency). The policy can even inspect response bodies or headers for retry signals.
The future enables delays: Returning a future from retry isn't just for asyncāit's for timing. The policy future can sleep before resolving, implementing exponential backoff, jitter, or rate-limit-aware delays based on Retry-After headers.
Key insight: Retry logic lives entirely in the policy, not the middleware. The Retry middleware just calls the policy after each response. This separation means you can compose policies, switch strategies without changing the service stack, and test retry logic independently. The policy can examine anything about the request/response to decide, and the future return type gives it full control over timing between attempts. Use Tower's built-in policies for common cases, implement Policy directly when you need custom decision logic.