Loading page…
Rust walkthroughs
Loading page…
tower::retry::Retry decide when to retry failed requests based on policy configuration?tower::retry::Retry delegates retry decisions entirely to a Policy implementation that examines each request and response (or error) to determine whether retrying is appropriate. The policy evaluates the request type, response status, and error conditions through its retry method—returning Some with a modified request for retry, or None to stop retrying. This design separates the mechanics of retrying (handled by Retry) from the decision logic (handled by Policy), enabling custom strategies for different failure modes like transient network errors, rate limiting, or timeout conditions.
use tower::retry::Retry;
use tower::Service;
use tower::retry::Policy;
use std::time::Duration;
// A simple retry policy that retries up to N times
#[derive(Clone)]
struct MaxRetries {
remaining: usize,
}
impl MaxRetries {
fn new(max: usize) -> Self {
Self { remaining: max }
}
}
impl<Req, Res, E> Policy<Req, Res, E> for MaxRetries
where
Req: Clone,
{
type Future = std::future::Ready<()>;
fn retry(&self, _req: &Req, result: Result<&Res, &E>) -> Option<Self> {
match result {
Ok(_) => {
// Success - don't retry
None
}
Err(_) => {
// Failure - retry if we have attempts remaining
if self.remaining > 0 {
Some(Self { remaining: self.remaining - 1 })
} else {
None
}
}
}
}
fn clone_request(&self, req: &Req) -> Option<Req> {
Some(req.clone())
}
}The Policy trait's retry method returns Some(policy) to continue retrying, or None to stop.
use tower::retry::Policy;
use std::future::Future;
// The Policy trait (simplified):
// pub trait Policy<Req, Res, E> {
// type Future: Future<Output = ()>;
//
// fn retry(&self, req: &Req, result: Result<&Res, &E>) -> Option<Self>;
//
// fn clone_request(&self, req: &Req) -> Option<Req>;
// }
// Key points:
// - retry() is called AFTER each request completes
// - If retry returns Some(new_policy), the request is retried
// - If retry returns None, retrying stops
// - clone_request() creates a fresh request for retry
// - The policy can be stateful (tracking retry count, backoff state, etc.)
// Policy implementations decide:
// - Which errors warrant retries
// - How many retries to attempt
// - What backoff strategy to use
// - Which requests are retryableThe Policy trait's methods control retry behavior based on request and result.
use tower::retry::Policy;
use std::time::Duration;
use std::future::Future;
use std::pin::Pin;
#[derive(Clone)]
struct ExponentialBackoff {
max_retries: usize,
attempt: usize,
base_delay: Duration,
max_delay: Duration,
}
impl ExponentialBackoff {
fn new(max_retries: usize, base_delay: Duration, max_delay: Duration) -> Self {
Self {
max_retries,
attempt: 0,
base_delay,
max_delay,
}
}
fn delay(&self) -> Duration {
let delay = self.base_delay * 2u32.pow(self.attempt as u32);
delay.min(self.max_delay)
}
}
impl<Req, Res, E> Policy<Req, Res, E> for ExponentialBackoff
where
Req: Clone,
{
type Future = Pin<Box<dyn Future<Output = ()> + Send>>;
fn retry(&self, _req: &Req, result: Result<&Res, &E>) -> Option<Self> {
match result {
Ok(_) => None, // Success, don't retry
Err(_) => {
if self.attempt < self.max_retries {
Some(Self {
attempt: self.attempt + 1,
max_retries: self.max_retries,
base_delay: self.base_delay,
max_delay: self.max_delay,
})
} else {
None // Max retries exceeded
}
}
}
}
fn clone_request(&self, req: &Req) -> Option<Req> {
Some(req.clone())
}
}
// The Future type allows async delays before retry
// tower uses this for backoff between retriesThe policy can return a Future that completes after a backoff delay before retrying.
use tower::retry::Policy;
#[derive(Clone)]
struct SelectiveRetry {
max_retries: usize,
attempts: usize,
}
impl<Req, Res> Policy<Req, Res, HttpError> for SelectiveRetry
where
Req: Clone,
{
type Future = std::future::Ready<()>;
fn retry(&self, req: &Req, result: Result<&Res, &HttpError>) -> Option<Self> {
match result {
Ok(_) => None, // Success
Err(error) => {
// Only retry certain error types
match error {
HttpError::Timeout => {
// Retry timeouts
if self.attempts < self.max_retries {
Some(Self {
max_retries: self.max_retries,
attempts: self.attempts + 1,
})
} else {
None
}
}
HttpError::ConnectionFailed => {
// Retry connection failures
if self.attempts < self.max_retries {
Some(Self {
max_retries: self.max_retries,
attempts: self.attempts + 1,
})
} else {
None
}
}
HttpError::NotFound => {
// Don't retry 404 errors
None
}
HttpError::Unauthorized => {
// Don't retry 401 errors
None
}
}
}
}
}
fn clone_request(&self, req: &Req) -> Option<Req> {
Some(req.clone())
}
}
#[derive(Debug)]
enum HttpError {
Timeout,
ConnectionFailed,
NotFound,
Unauthorized,
}Policies can inspect error types to decide which failures are retryable.
use tower::retry::Policy;
#[derive(Clone)]
struct RetryOnStatus {
retryable_statuses: Vec<u16>,
max_retries: usize,
attempts: usize,
}
impl<Req> Policy<Req, HttpResponse, ()> for RetryOnStatus
where
Req: Clone,
{
type Future = std::future::Ready<()>;
fn retry(&self, req: &Req, result: Result<&HttpResponse, &()>) -> Option<Self> {
match result {
Ok(response) => {
// Check if response status indicates a retryable error
if self.retryable_statuses.contains(&response.status_code) {
if self.attempts < self.max_retries {
Some(Self {
retryable_statuses: self.retryable_statuses.clone(),
max_retries: self.max_retries,
attempts: self.attempts + 1,
})
} else {
None
}
} else {
None // Status not retryable
}
}
Err(_) => None, // Error case handled elsewhere
}
}
fn clone_request(&self, req: &Req) -> Option<Req> {
Some(req.clone())
}
}
#[derive(Debug)]
struct HttpResponse {
status_code: u16,
body: String,
}
// Common retryable statuses: 429 (rate limit), 502, 503, 504 (server errors)Policies can inspect successful responses (like rate limiting headers) to trigger retries.
use tower::retry::Retry;
use tower::Service;
// Retry wraps an inner service with retry logic
// The service is called repeatedly based on policy decisions
async fn make_request<S, P>(service: &mut Retry<P, S>, request: Request) -> Result<Response, Error>
where
S: Service<Request, Response = Response, Error = Error> + Clone,
P: Policy<Request, Response, Error>,
{
service.call(request).await
}
// The Retry service:
// 1. Calls the inner service with the request
// 2. On completion, calls policy.retry() with the result
// 3. If policy.retry() returns Some(new_policy):
// - Clones the request (via policy.clone_request())
// - Waits for policy's Future to complete (for backoff)
// - Retries with the new policy
// 4. If policy.retry() returns None:
// - Returns the result (success or final failure)Retry wraps any service and delegates retry decisions to the policy.
use tower::retry::Policy;
use std::time::Duration;
use rand::Rng;
#[derive(Clone)]
struct JitteredBackoff {
max_retries: usize,
attempt: usize,
base_delay: Duration,
max_delay: Duration,
}
impl JitteredBackoff {
fn new(max_retries: usize, base_delay: Duration, max_delay: Duration) -> Self {
Self {
max_retries,
attempt: 0,
base_delay,
max_delay,
}
}
fn delay_with_jitter(&self) -> Duration {
let base = self.base_delay * 2u32.pow(self.attempt as u32);
let capped = base.min(self.max_delay);
// Add random jitter (0% to 50% of the delay)
let mut rng = rand::thread_rng();
let jitter_range = capped.as_millis() as f64 * 0.5;
let jitter = rng.gen_range(0.0..jitter_range) as u64;
Duration::from_millis(capped.as_millis() as u64 + jitter)
}
}
impl<Req, Res, E> Policy<Req, Res, E> for JitteredBackoff
where
Req: Clone,
{
type Future = std::future::Ready<()>;
fn retry(&self, _req: &Req, result: Result<&Res, &E>) -> Option<Self> {
match result {
Ok(_) => None,
Err(_) => {
if self.attempt < self.max_retries {
Some(Self {
max_retries: self.max_retries,
attempt: self.attempt + 1,
base_delay: self.base_delay,
max_delay: self.max_delay,
})
} else {
None
}
}
}
}
fn clone_request(&self, req: &Req) -> Option<Req> {
Some(req.clone())
}
}
// Jitter prevents thundering herd problems when multiple clients
// retry simultaneously after a server failureJitter spreads out retry attempts across distributed clients to avoid synchronized retry storms.
use tower::retry::Policy;
// clone_request is called when retrying
// The request must be clonable for retries
#[derive(Clone)]
struct Request {
path: String,
body: Vec<u8>,
}
// For non-cloneable requests, you have options:
// 1. Make the request cloneable (derive Clone)
// 2. Use Arc<Request> for cheap cloning
// 3. Return None from clone_request (no retries)
// Arc example:
use std::sync::Arc;
#[derive(Clone)]
struct PolicyWithArc {
max_retries: usize,
}
impl<Res, E> Policy<Arc<Request>, Res, E> for PolicyWithArc {
type Future = std::future::Ready<()>;
fn retry(&self, _req: &Arc<Request>, result: Result<&Res, &E>) -> Option<Self> {
match result {
Ok(_) => None,
Err(_) => {
if self.max_retries > 0 {
Some(Self { max_retries: self.max_retries - 1 })
} else {
None
}
}
}
}
fn clone_request(&self, req: &Arc<Request>) -> Option<Arc<Request>> {
// Arc::clone is cheap
Some(Arc::clone(req))
}
}Requests must be clonable; Arc provides cheap cloning for expensive requests.
use tower::retry::Policy;
#[derive(Clone)]
struct IdempotentPolicy {
max_retries: usize,
attempts: usize,
}
impl<Req, Res, E> Policy<Req, Res, E> for IdempotentPolicy
where
Req: Clone + Idempotent,
{
type Future = std::future::Ready<()>;
fn retry(&self, req: &Req, result: Result<&Res, &E>) -> Option<Self> {
match result {
Ok(_) => None,
Err(_) => {
// Only retry if request is idempotent
if req.is_idempotent() && self.attempts < self.max_retries {
Some(Self {
max_retries: self.max_retries,
attempts: self.attempts + 1,
})
} else {
None
}
}
}
}
fn clone_request(&self, req: &Req) -> Option<Req> {
Some(req.clone())
}
}
trait Idempotent {
fn is_idempotent(&self) -> bool;
}
// HTTP methods:
// - GET, HEAD, OPTIONS, TRACE: Always idempotent
// - PUT, DELETE: Idempotent (same request = same effect)
// - POST: NOT idempotent (may create resources)
// - PATCH: Depends on implementation
// Only retry idempotent operations to avoid duplicate effectsRetrying non-idempotent operations can cause duplicate side effects; policies should check idempotency.
use tower::ServiceBuilder;
use tower::retry::Retry;
use tower::limit::concurrency::ConcurrencyLimit;
use std::time::Duration;
async fn build_service<S>(inner: S) -> impl tower::Service<Request>
where
S: tower::Service<Request> + Clone + Send + 'static,
S::Future: Send,
{
let policy = ExponentialBackoff::new(3, Duration::from_millis(100), Duration::from_secs(5));
ServiceBuilder::new()
// Concurrency limit before retry
.layer(ConcurrencyLimit::new(10))
// Retry layer with policy
.layer(Retry::new(policy))
// Inner service
.service(inner)
}
// Layer ordering matters:
// - Retry is typically outer layer (retries on failures from inner layers)
// - Concurrency limit applies to all requests including retries
// - Timeout should be inner (retry the whole operation, not just timeout)ServiceBuilder stacks retry with other middleware in the correct order.
use tower::retry::Retry;
use tower::retry::Policy;
use tower::Service;
use std::time::Duration;
#[derive(Clone)]
struct HttpRetryPolicy {
max_retries: usize,
attempt: usize,
}
impl HttpRetryPolicy {
fn new(max_retries: usize) -> Self {
Self { max_retries, attempt: 0 }
}
}
impl Policy<HttpRequest, HttpResponse, HttpError> for HttpRetryPolicy {
type Future = std::future::Ready<()>;
fn retry(&self, req: &HttpRequest, result: Result<&HttpResponse, &HttpError>) -> Option<Self> {
match result {
Ok(response) => {
// Retry on server errors (5xx) and rate limiting (429)
if response.status >= 500 || response.status == 429 {
if self.attempt < self.max_retries {
println!("Retrying due to status {}", response.status);
Some(Self {
max_retries: self.max_retries,
attempt: self.attempt + 1,
})
} else {
None
}
} else {
None
}
}
Err(error) => {
// Retry on transient errors
match error {
HttpError::Timeout | HttpError::ConnectionFailed => {
if self.attempt < self.max_retries {
println!("Retrying due to error: {:?}", error);
Some(Self {
max_retries: self.max_retries,
attempt: self.attempt + 1,
})
} else {
None
}
}
HttpError::InvalidRequest(_) => None, // Don't retry bad requests
}
}
}
}
fn clone_request(&self, req: &HttpRequest) -> Option<HttpRequest> {
// Only retry if body is cloneable or if method is safe
if req.method.is_idempotent() {
Some(req.clone())
} else {
println!("Not retrying non-idempotent request");
None
}
}
}
#[derive(Clone, Debug)]
struct HttpRequest {
method: HttpMethod,
path: String,
body: Option<Vec<u8>>,
}
impl HttpRequest {
fn is_idempotent(&self) -> bool {
matches!(self.method, HttpMethod::Get | HttpMethod::Put | HttpMethod::Delete)
}
}
#[derive(Clone, Debug)]
enum HttpMethod {
Get, Post, Put, Delete,
}
impl HttpMethod {
fn is_idempotent(&self) -> bool {
matches!(self, Self::Get | Self::Put | Self::Delete)
}
}
#[derive(Debug)]
struct HttpResponse {
status: u16,
body: String,
}
#[derive(Debug)]
enum HttpError {
Timeout,
ConnectionFailed,
InvalidRequest(String),
}A complete policy considers both error types and response statuses for retry decisions.
// Policy is cloned on each retry, enabling stateful tracking
#[derive(Clone)]
struct TrackingPolicy {
attempts: usize,
max_retries: usize,
start_time: std::time::Instant,
timeout: std::time::Duration,
}
impl<Req, Res, E> Policy<Req, Res, E> for TrackingPolicy
where
Req: Clone,
{
type Future = std::future::Ready<()>;
fn retry(&self, req: &Req, result: Result<&Res, &E>) -> Option<Self> {
// Check total time elapsed
if self.start_time.elapsed() > self.timeout {
println!("Retry timeout exceeded");
return None;
}
match result {
Ok(_) => None,
Err(_) => {
if self.attempts < self.max_retries {
println!("Retry attempt {} of {}", self.attempts + 1, self.max_retries);
Some(Self {
attempts: self.attempts + 1,
max_retries: self.max_retries,
start_time: self.start_time,
timeout: self.timeout,
})
} else {
println!("Max retries ({}) exceeded", self.max_retries);
None
}
}
}
}
fn clone_request(&self, req: &Req) -> Option<Req> {
Some(req.clone())
}
}
// The policy returned from retry() becomes the policy for the next attempt
// This enables tracking cumulative state across retriesEach retry gets a new policy instance; state carries over via the returned Some(new_policy).
use tower::retry::Budget;
use std::time::Duration;
// Budget limits how much retry can be done across many requests
// This prevents cascading failures in high-traffic scenarios
fn create_budget() -> Budget {
Budget::new(
100, // Max retries
Duration::from_secs(60), // Per 60 seconds
)
}
// Budget is shared across multiple requests
// When budget is exhausted, retries are denied
// This protects services under load from retry storms
// Budget::deposit() adds to the budget (for successful requests)
// Budget::withdraw() removes from budget (for retries)Budget limits total retries across all requests, preventing retry storms during outages.
Policy decision flow:
Retry::call(request) invokes inner serviceResult<Response, Error>policy.retry(request, result) is calledSome(new_policy): clone request, wait for policy's future, retryNone: return result (success or final failure)Policy implementation considerations:
| Concern | Policy Method | Purpose |
|---------|---------------|---------|
| Retry decision | retry() | Returns Some to retry, None to stop |
| Request cloning | clone_request() | Creates fresh request for retry |
| Backoff delay | Future return type | Controls delay before retry |
When to retry:
When NOT to retry:
The fundamental insight: tower::retry::Retry is a mechanism that executes retries, while Policy is the decision logic that determines whether to retry. This separation enables composing retry behavior with different strategies—exponential backoff, jittered retries, budget-limited retries—without changing the retry infrastructure. The policy is stateful and carried through the retry loop, allowing sophisticated tracking of attempts, timing, and conditions across the entire retry sequence.