What is the purpose of tower::load_shed::LoadShedLayer for handling service overload?

LoadShedLayer wraps a service with overload protection that rejects requests when the service becomes overloaded, implementing a "fail-fast" strategy rather than queuing requests indefinitely. When applied to a service, it monitors the number of in-flight requests and rejects new requests with a "service overloaded" error when the configured concurrency limit is reached. This prevents cascading failures where backed-up services consume memory and resources queuing requests they cannot process, allowing callers to respond appropriately—retry, route elsewhere, or return a graceful error—rather than timing out waiting for responses that may never come.

Basic LoadShedLayer Usage

use tower::ServiceBuilder;
use tower::load_shed::LoadShedLayer;
use tower::Service;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
 
// A simple echo service
#[derive(Clone)]
struct EchoService;
 
impl Service<String> for EchoService {
    type Response = String;
    type Error = String;
    type Future = Pin<Box<dyn Future<Output = Result<String, String>> + Send>>;
 
    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }
 
    fn call(&mut self, req: String) -> Self::Future {
        Box::pin(async move {
            Ok(format!("Echo: {}", req))
        })
    }
}
 
#[tokio::main]
async fn main() {
    // Apply LoadShedLayer to the service
    let service = ServiceBuilder::new()
        .layer(LoadShedLayer::new(10)) // Reject after 10 in-flight requests
        .service(EchoService);
 
    println!("Service wrapped with load shedding, max 10 concurrent requests");
}

LoadShedLayer::new(10) creates a load shedder that allows up to 10 concurrent requests before rejecting.

How Load Shedding Works

use tower::load_shed::LoadShedLayer;
use tower::Service;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
 
// Track request processing for demonstration
#[derive(Clone)]
struct TrackedService {
    in_flight: Arc<AtomicUsize>,
    max_seen: Arc<AtomicUsize>,
}
 
impl Service<()> for TrackedService {
    type Response = ();
    type Error = &'static str;
    type Future = std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), &'static str>> + Send>>;
 
    fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
        // Service is always ready to accept
        std::task::Poll::Ready(Ok(()))
    }
 
    fn call(&mut self, _: ()) -> Self::Future {
        let in_flight = self.in_flight.clone();
        let max_seen = self.max_seen.clone();
        
        // Increment in-flight counter
        let count = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
        
        // Track max
        let mut current_max = max_seen.load(Ordering::SeqCst);
        while count > current_max {
            match max_seen.compare_exchange_weak(
                current_max,
                count,
                Ordering::SeqCst,
                Ordering::SeqCst,
            ) {
                Ok(_) => break,
                Err(actual) => current_max = actual,
            }
        }
        
        Box::pin(async move {
            // Simulate some work
            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
            in_flight.fetch_sub(1, Ordering::SeqCst);
            Ok(())
        })
    }
}
 
#[tokio::main]
async fn main() {
    use tower::ServiceBuilder;
    
    let in_flight = Arc::new(AtomicUsize::new(0));
    let max_seen = Arc::new(AtomicUsize::new(0));
    
    let service = ServiceBuilder::new()
        .layer(LoadShedLayer::new(3)) // Max 3 concurrent
        .service(TrackedService {
            in_flight: in_flight.clone(),
            max_seen: max_seen.clone(),
        });
    
    // Without load shedding, many concurrent requests would queue
    // With load shedding, requests beyond the limit are rejected
    println!("Load shed configured with concurrency limit of 3");
}

Load shedding tracks in-flight requests and rejects new ones when the limit is reached.

Concurrency Limit Configuration

use tower::load_shed::LoadShedLayer;
use tower::ServiceBuilder;
 
fn configure_limits() {
    // Aggressive load shedding (low limit)
    let aggressive = ServiceBuilder::new()
        .layer(LoadShedLayer::new(5));
    
    // Conservative load shedding (higher limit)
    let conservative = ServiceBuilder::new()
        .layer(LoadShedLayer::new(100));
    
    // The limit should be based on:
    // 1. Service processing time
    // 2. Available resources (memory, CPU, connections)
    // 3. Acceptable latency for queued requests
    
    // Lower limits = faster rejection, better latency for accepted requests
    // Higher limits = more requests accepted, but higher latency when near capacity
}
 
fn main() {
    configure_limits();
}

The concurrency limit should be tuned based on service characteristics and requirements.

Integration with Tower Middleware Stack

use tower::ServiceBuilder;
use tower::load_shed::LoadShedLayer;
use tower::limit::{ConcurrencyLimitLayer, RateLimitLayer};
use tower::timeout::TimeoutLayer;
use std::time::Duration;
 
#[tokio::main]
async fn main() {
    // Common pattern: combine load shedding with other middleware
    let service = ServiceBuilder::new()
        // Load shed first - reject immediately when overloaded
        .layer(LoadShedLayer::new(50))
        // Then apply rate limiting
        .layer(RateLimitLayer::new(100, Duration::from_secs(1)))
        // Then apply timeout for actual processing
        .layer(TimeoutLayer::new(Duration::from_secs(5)))
        .service(/* your service */);
    
    // Order matters:
    // 1. LoadShedLayer: Quick rejection when at capacity
    // 2. RateLimitLayer: Control request rate
    // 3. TimeoutLayer: Limit processing time
    
    // If LoadShedLayer is after RateLimitLayer, rate-limited requests
    // would still count against the concurrency limit
}

LoadShedLayer is typically placed early in the middleware stack for fast rejection.

Rejected Request Handling

use tower::load_shed::LoadShedLayer;
use tower::{Service, ServiceBuilder};
use std::future::Future;
use std::pin::Pin;
 
#[derive(Clone)]
struct MyService;
 
impl Service<String> for MyService {
    type Response = String;
    type Error = String;
    type Future = Pin<Box<dyn Future<Output = Result<String, String>> + Send>>;
 
    fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
        std::task::Poll::Ready(Ok(()))
    }
 
    fn call(&mut self, req: String) -> Self::Future {
        Box::pin(async move { Ok(format!("Processed: {}", req)) })
    }
}
 
#[tokio::main]
async fn main() {
    let mut service = ServiceBuilder::new()
        .layer(LoadShedLayer::new(1)) // Very low limit for demonstration
        .service(MyService);
    
    // When overloaded, call() returns an error immediately
    use tower::ServiceExt;
    
    // First request should succeed
    match service.ready().await {
        Ok(svc) => {
            match svc.call("test".to_string()).await {
                Ok(response) => println!("Success: {}", response),
                Err(e) => println!("Error: {}", e),
            }
        }
        Err(e) => println!("Service not ready: {}", e),
    }
}

When overloaded, the service returns errors rather than queuing requests.

The Overloaded Error Type

use tower::load_shed::error::Overloaded;
 
fn handle_load_shed_error() {
    // LoadShedLayer produces Overloaded errors
    // The error indicates the service rejected the request due to load
    
    // Example error handling pattern:
    // match service.call(request).await {
    //     Ok(response) => process(response),
    //     Err(LoadShedError::Overloaded) => {
    //         // Service is overloaded - options:
    //         // 1. Return error to client (503 Service Unavailable)
    //         // 2. Retry with backoff
    //         // 3. Route to different instance
    //         // 4. Use cached fallback
    //     }
    //     Err(other) => handle_other_error(other),
    // }
    
    // The Overloaded type provides context about rejection
    println!("Overloaded errors indicate service capacity exceeded");
}

The Overloaded error type signals that the rejection was due to capacity, not a logic error.

Combining with Retry Logic

use tower::ServiceBuilder;
use tower::load_shed::LoadShedLayer;
use tower::retry::RetryLayer;
use tower::reconnect::ReconnectLayer;
 
#[tokio::main]
async fn main() {
    // Pattern: Load shed + retry for resilience
    let service = ServiceBuilder::new()
        .layer(LoadShedLayer::new(10))
        // Note: Retry should be carefully considered with load shed
        // Retrying on overload can make the problem worse
        .service(/* your service */);
    
    // Better pattern: Use load shed error for routing decisions
    // rather than automatic retry, which can amplify load
}

Be cautious combining load shedding with automatic retries—they can amplify load.

Graceful Degradation Pattern

use tower::{Service, ServiceBuilder};
use tower::load_shed::LoadShedLayer;
use std::future::Future;
use std::pin::Pin;
 
#[derive(Clone)]
struct ApiClient {
    primary: MyService,
    fallback: MyService,
}
 
impl ApiClient {
    async fn call_with_fallback(&mut self, request: String) -> Result<String, String> {
        // Try primary first
        match self.primary.ready().await {
            Ok(svc) => match svc.call(request.clone()).await {
                Ok(response) => return Ok(response),
                Err(e) if e.contains("overloaded") => {
                    // Primary is overloaded, try fallback
                    println!("Primary overloaded, using fallback");
                }
                Err(e) => return Err(e),
            },
            Err(e) => {
                println!("Primary not ready: {}", e);
            }
        }
        
        // Fallback
        self.fallback.ready().await
            .map_err(|e| e.to_string())?
            .call(request)
            .await
    }
}
 
#[derive(Clone)]
struct MyService;
 
impl Service<String> for MyService {
    type Response = String;
    type Error = String;
    type Future = Pin<Box<dyn Future<Output = Result<String, String>> + Send>>;
 
    fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
        std::task::Poll::Ready(Ok(()))
    }
 
    fn call(&mut self, req: String) -> Self::Future {
        Box::pin(async move { Ok(format!("Handled: {}", req)) })
    }
}
 
#[tokio::main]
async fn main() {
    let mut client = ApiClient {
        primary: ServiceBuilder::new()
            .layer(LoadShedLayer::new(5))
            .service(MyService),
        fallback: ServiceBuilder::new()
            .layer(LoadShedLayer::new(10))
            .service(MyService),
    };
    
    println!("Client configured with primary and fallback services");
}

Use load shed errors to trigger fallback behavior.

Monitoring and Metrics

use tower::ServiceBuilder;
use tower::load_shed::LoadShedLayer;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
 
struct Metrics {
    requests_accepted: AtomicU64,
    requests_rejected: AtomicU64,
    requests_in_flight: AtomicU64,
}
 
impl Metrics {
    fn new() -> Self {
        Metrics {
            requests_accepted: AtomicU64::new(0),
            requests_rejected: AtomicU64::new(0),
            requests_in_flight: AtomicU64::new(0),
        }
    }
    
    fn accept(&self) {
        self.requests_accepted.fetch_add(1, Ordering::Relaxed);
    }
    
    fn reject(&self) {
        self.requests_rejected.fetch_add(1, Ordering::Relaxed);
    }
    
    fn report(&self) {
        println!(
            "Accepted: {}, Rejected: {}, In-flight: {}",
            self.requests_accepted.load(Ordering::Relaxed),
            self.requests_rejected.load(Ordering::Relaxed),
            self.requests_in_flight.load(Ordering::Relaxed)
        );
    }
}
 
fn configure_with_metrics(metrics: Arc<Metrics>, limit: usize) {
    let _service = ServiceBuilder::new()
        .layer(LoadShedLayer::new(limit))
        .service(/* your service */);
    
    // Metrics collection would typically be done in custom middleware
    // wrapped around the load-shedded service
}
 
fn main() {
    let metrics = Arc::new(Metrics::new());
    configure_with_metrics(metrics.clone(), 50);
}

Monitor load shed metrics to understand capacity and tune limits.

When to Use Load Shedding

// Good use cases for LoadShedLayer:
 
// 1. Services with bounded resource requirements
//    - Request processing uses predictable resources
//    - Clear relationship between concurrency and resource use
 
// 2. Distributed systems with fallback options
//    - Multiple service instances available
//    - Client can route to other instances
 
// 3. APIs with clear SLAs
//    - Need predictable latency
//    - Prefer fast failure over slow success
 
// 4. Batch processing systems
//    - Can reschedule work for later
//    - Better to reject than to queue indefinitely
 
// Less appropriate:
 
// 1. Interactive user requests (no fallback)
//    - User expects response, not error
//    - Consider queueing with timeout instead
 
// 2. Services with highly variable request costs
//    - Concurrency limit doesn't account for cost variance
//    - Consider cost-aware load shedding instead
 
// 3. Systems without monitoring
//    - Can't tune limits without visibility
//    - Risk of under-utilizing capacity
 
fn main() {}

Load shedding works best when you have fallback options and predictable request costs.

Load Shed vs Buffer vs Concurrency Limit

use tower::ServiceBuilder;
use tower::load_shed::LoadShedLayer;
use tower::buffer::BufferLayer;
use tower::limit::ConcurrencyLimitLayer;
 
#[tokio::main]
async fn main() {
    // LoadShedLayer: Reject immediately when at capacity
    let load_shed = ServiceBuilder::new()
        .layer(LoadShedLayer::new(10))
        .service(/* service */);
    
    // BufferLayer: Queue requests up to a bound
    // Requests wait until capacity available
    let buffered = ServiceBuilder::new()
        .layer(BufferLayer::new(10))
        .service(/* service */);
    
    // ConcurrencyLimit: Similar to load shed but queues
    // Allows up to N in-flight, queues excess
    let limited = ServiceBuilder::new()
        .layer(ConcurrencyLimitLayer::new(10))
        .service(/* service */);
    
    // Key differences:
    // - LoadShed: Immediate rejection, no queueing
    // - Buffer: Queues requests, processes later
    // - ConcurrencyLimit: Queues with backpressure
    
    // Use LoadShed when you want fast failure
    // Use Buffer/ConcurrencyLimit when you want eventual processing
}

LoadShedLayer rejects immediately while other approaches queue requests.

Synthesis

Quick reference:

use tower::ServiceBuilder;
use tower::load_shed::LoadShedLayer;
 
// Basic usage
let service = ServiceBuilder::new()
    .layer(LoadShedLayer::new(50)) // Max 50 concurrent requests
    .service(your_service);
 
// Load shedding provides:
// 1. Immediate rejection when at capacity
// 2. No request queueing memory pressure
// 3. Clear signal to callers (Overloaded error)
// 4. Predictable latency for accepted requests
 
// Configuration considerations:
// - Set limit based on service capacity
// - Lower limits = more rejections, faster responses
// - Higher limits = fewer rejections, potential queue delays
 
// Error handling:
// - Catch Overloaded errors specifically
// - Consider fallback routing on overload
// - Avoid automatic retry (can amplify load)
 
// Common pattern: early in middleware stack
let service = ServiceBuilder::new()
    .layer(LoadShedLayer::new(100))      // Quick rejection
    .layer(RateLimitLayer::new(50, Duration::from_secs(1)))
    .layer(TimeoutLayer::new(Duration::from_secs(5)))
    .service(your_service);

Key insight: LoadShedLayer implements the circuit breaker pattern for concurrency—rather than allowing unlimited requests to queue (consuming memory and creating unpredictable latency), it draws a hard line at a configured concurrency limit and immediately rejects excess requests. This "fail fast" approach is essential for distributed systems because queued requests create backpressure that propagates upstream, potentially causing cascade failures. When LoadShedLayer rejects a request, the caller receives an immediate Overloaded error and can make an informed decision: route to another instance, return a cached response, or propagate a clear 503 error. Without load shedding, services under load accumulate queued requests, memory usage grows, latency spikes, and eventually the entire service crashes. The key is tuning the limit appropriately: too low and you waste capacity; too high and you've just reimplemented an unbounded queue. The limit should reflect your service's actual processing capacity—the number of concurrent requests it can handle while maintaining acceptable latency and resource utilization.