Loading pageā¦
Rust walkthroughs
Loading pageā¦
tower::load_shed::LoadShed handle overload protection for backend services?tower::load_shed::LoadShed provides overload protection by wrapping a Tower service and rejecting requests when the underlying service is overloaded, returning a LoadShed error instead of queuing requests or allowing latency to spike. It works by tracking the number of in-flight requests: when a new request arrives and the service is already at capacity, LoadShed immediately returns an error rather than adding to the queue. This fail-fast behavior prevents cascading failures where slow services accumulate requests that eventually exhaust memory or timeout upstream, and it allows callers to implement fallback strategies like retrying later, serving cached responses, or routing to alternative backends. LoadShed integrates with Tower's middleware ecosystem, combining naturally with other layers like retries, timeouts, and rate limits to create resilient service architectures that degrade gracefully under load rather than catastrophically failing.
// Without load shedding, overloaded services exhibit several failure modes:
// 1. Request queuing - requests pile up waiting for service
// 2. Latency spike - queue wait time adds to response time
// 3. Memory exhaustion - queued requests consume memory
// 4. Cascading failures - upstream services timeout and retry
// 5. Death spiral - retries increase load, making things worse
// Consider a database service that can handle 100 concurrent queries:
// - At 50 concurrent: 10ms latency
// - At 100 concurrent: 50ms latency
// - At 200 concurrent: requests queue, latency hits 500ms+
// - At 500 concurrent: OOM or timeout cascade
// Load shedding prevents this by rejecting requests at capacity:
// - At 100 concurrent: new requests get immediate error
// - Callers can retry, use cache, or fail gracefully
// - Service stays responsive for accepted requests
fn main() {
println!("Load shedding prevents cascading failures");
}Without protection, services accumulate requests until they fail catastrophically.
use tower::{Service, ServiceBuilder, ServiceExt};
use tower::load_shed::LoadShedLayer;
use tower::limit::concurrency::ConcurrencyLimitLayer;
use std::task::{Context, Poll};
use std::pin::Pin;
use std::future::Future;
use std::time::Duration;
use tokio::time::sleep;
// A simple echo service
struct EchoService;
impl Service<String> for EchoService {
type Response = String;
type Error = std::convert::Infallible;
type Future = Pin<Box<dyn Future<Output = Result<String, Self::Error>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: String) -> Self::Future {
Box::pin(async move {
// Simulate some work
sleep(Duration::from_millis(100)).await;
Ok(format!("Echo: {}", req))
})
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// LoadShed wraps a service and rejects when at capacity
let mut service = ServiceBuilder::new()
.load_shed() // Reject when not ready
.concurrency_limit(3) // Max 3 concurrent requests
.service(EchoService);
// Send requests concurrently
let mut handles = vec![];
for i in 0..5 {
let svc = ServiceExt::<String>::clone(&mut service);
handles.push(tokio::spawn(async move {
svc.oneshot(format!("request {}", i)).await
}));
}
// Some will succeed, some will be load shed
for (i, handle) in handles.into_iter().enumerate() {
match handle.await? {
Ok(Ok(response)) => println!("Request {}: {}", i, response),
Ok(Err(_)) => println!("Request {}: Load shed!", i),
Err(e) => println!("Request {}: Join error: {}", i, e),
}
}
Ok(())
}LoadShedLayer wraps a service and rejects requests when it's not ready.
use tower::{Service, ServiceBuilder};
use tower::load_shed::LoadShedLayer;
use std::task::{Context, Poll};
use std::pin::Pin;
use std::future::Future;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
// A service that tracks concurrency
struct TrackedService {
in_flight: Arc<AtomicUsize>,
max_concurrency: usize,
}
impl Service<String> for TrackedService {
type Response = String;
type Error = std::convert::Infallible;
type Future = Pin<Box<dyn Future<Output = Result<String, Self::Error>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// poll_ready returns Ready(Ok) if we can accept more requests
// Returns Pending if we're at capacity
let current = self.in_flight.load(Ordering::Relaxed);
if current < self.max_concurrency {
Poll::Ready(Ok(()))
} else {
// At capacity - but LoadShed will catch this
// and return an error instead of pending
Poll::Pending
}
}
fn call(&mut self, _req: String) -> Self::Future {
let in_flight = self.in_flight.clone();
in_flight.fetch_add(1, Ordering::Relaxed);
Box::pin(async move {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
in_flight.fetch_sub(1, Ordering::Relaxed);
Ok("response".to_string())
})
}
}
fn main() {
// The key insight: LoadShed intercepts poll_ready
// When the inner service returns Pending, LoadShed converts it to:
// Ready(Err(Overloaded))
//
// This means:
// 1. Callers get immediate feedback (not stuck waiting)
// 2. The service can reject work it can't handle
// 3. Backpressure is communicated explicitly
println!("LoadShed converts Pending poll_ready to Ready(Err)");
}LoadShed intercepts poll_ready() and converts Pending into an immediate error.
use tower::{Service, ServiceBuilder, ServiceExt};
use tower::load_shed::LoadShedLayer;
use std::task::{Context, Poll};
use std::pin::Pin;
use std::future::Future;
struct SimpleService;
impl Service<String> for SimpleService {
type Response = String;
type Error = std::convert::Infallible;
type Future = Pin<Box<dyn Future<Output = Result<String, Self::Error>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: String) -> Self::Future {
Box::pin(async move { Ok(req) })
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut service = ServiceBuilder::new()
.load_shed()
.concurrency_limit(1)
.service(SimpleService);
// The error type changes when LoadShed is added
// It wraps the inner error in tower::load_shed::error::Overloaded
// First request should succeed
let result = service.ready().await?.call("test".to_string()).await;
println!("First request: {:?}", result);
// If we exceed capacity, we get Overloaded error
// The error indicates the service couldn't accept the request
Ok(())
}LoadShed returns an Overloaded error when rejecting requests.
use tower::{ServiceBuilder, ServiceExt};
use tower::load_shed::LoadShedLayer;
use tower::limit::concurrency::ConcurrencyLimitLayer;
use std::time::Duration;
use tokio::time::sleep;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Common pattern: LoadShed + ConcurrencyLimit
// ConcurrencyLimit: limits in-flight requests
// LoadShed: rejects when at limit (instead of queuing)
// Without LoadShed:
// - ConcurrencyLimit would queue requests
// - Callers wait in line, adding latency
// With LoadShed:
// - Requests over limit get immediate error
// - Callers can fail fast
let service = tower::service_fn(|req: String| async move {
sleep(Duration::from_millis(50)).await;
Ok::<_, std::convert::Infallible>(format!("processed: {}", req))
});
let mut service = ServiceBuilder::new()
.load_shed() // Reject when at capacity
.concurrency_limit(3) // Max 3 concurrent
.timeout(Duration::from_secs(1)) // Timeout for safety
.service(service);
// Simulate burst of requests
let mut handles = vec![];
for i in 0..10 {
// Clone the service for each request
let mut svc = ServiceExt::<String>::clone(&mut service);
handles.push(tokio::spawn(async move {
match svc.ready().await {
Ok(mut svc) => svc.call(format!("request-{}", i)).await,
Err(e) => {
println!("Request {} rejected: {:?}", i, e);
Err(e)
}
}
}));
}
for (i, handle) in handles.into_iter().enumerate() {
match handle.await? {
Ok(Ok(response)) => println!("Request {}: {}", i, response),
Ok(Err(e)) => println!("Request {}: Error: {:?}", i, e),
Err(e) => println!("Request {}: Join error: {}", i, e),
}
}
Ok(())
}LoadShed pairs naturally with ConcurrencyLimit for effective overload protection.
use tower::{ServiceBuilder, ServiceExt};
use tower::load_shed::LoadShedLayer;
use tower::limit::concurrency::ConcurrencyLimitLayer;
use tower::limit::rate::RateLimitLayer;
use tower::retry::RetryLayer;
use tower::timeout::TimeoutLayer;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Typical production middleware stack with load shedding
let backend = tower::service_fn(|req: String| async move {
// Simulate backend work
tokio::time::sleep(Duration::from_millis(10)).await;
Ok::<_, std::convert::Infallible>(format!("Response to: {}", req))
});
let service = ServiceBuilder::new()
// Layer order matters! (outer to inner)
.timeout(Duration::from_secs(5)) // Total request timeout
.load_shed() // Reject if at capacity
.concurrency_limit(100) // Max concurrent requests
.rate_limit(1000, Duration::from_secs(1)) // Rate limit
.service(backend);
// Request flow:
// 1. Rate limit: Is request within rate?
// 2. Concurrency limit: Is there capacity?
// 3. Load shed: If no capacity, return error immediately
// 4. Timeout: Does request complete in time?
// 5. Backend: Process request
println!("Middleware stack configured with load shedding");
Ok(())
}LoadShed fits into Tower's layered architecture for comprehensive resilience.
use tower::{ServiceBuilder, ServiceExt, Service};
use tower::load_shed::LoadShedLayer;
use tower::limit::concurrency::ConcurrencyLimitLayer;
use std::time::Duration;
use tokio::time::sleep;
#[derive(Debug)]
enum ServiceError {
Overloaded,
Timeout,
Other(String),
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let backend = tower::service_fn(|req: String| async move {
sleep(Duration::from_millis(100)).await;
Ok::<_, std::convert::Infallible>(format!("OK: {}", req))
});
let mut service = ServiceBuilder::new()
.load_shed()
.concurrency_limit(2)
.service(backend);
// When handling load shed errors, you have options:
// 1. Return error to caller
// 2. Retry with backoff
// 3. Serve from cache
// 4. Route to fallback service
async fn handle_request(
service: &mut impl Service<String, Response = String, Error = tower::BoxError>,
request: String,
) -> Result<String, ServiceError> {
match service.ready().await {
Ok(mut svc) => {
match svc.call(request).await {
Ok(response) => Ok(response),
Err(e) => {
// Check if this is a load shed error
let err_str = e.to_string();
if err_str.contains("overloaded") || err_str.contains("capacity") {
Err(ServiceError::Overloaded)
} else {
Err(ServiceError::Other(err_str))
}
}
}
}
Err(e) => {
// Service not ready - likely overloaded
Err(ServiceError::Overloaded)
}
}
}
// Alternative: Implement fallback logic
async fn request_with_fallback(
primary: &mut impl Service<String, Response = String, Error = tower::BoxError>,
fallback: &str,
request: String,
) -> String {
match primary.ready().await {
Ok(mut svc) => {
match svc.call(request).await {
Ok(response) => response,
Err(_) => format!("Fallback: {}", fallback),
}
}
Err(_) => format!("Fallback: {}", fallback),
}
}
println!("Error handling patterns demonstrated");
Ok(())
}Load shed errors should trigger fallback strategies rather than propagate to users.
// Different strategies for handling overload:
// 1. Queue-based (without LoadShed):
// - Requests wait in queue
// - Latency increases under load
// - Eventually times out or runs out of memory
// - "Fail slow" approach
// 2. Load Shedding (with LoadShed):
// - Requests rejected immediately when at capacity
// - Latency stays stable for accepted requests
// - Callers get fast feedback
// - "Fail fast" approach
// 3. Adaptive Load Shedding:
// - Adjust capacity based on response times
// - More sophisticated but more complex
// - tower-load_shed is simpler (fixed capacity)
// 4. Circuit Breaker:
// - Different concern: fail when error rate is high
// - Can combine with load shedding
// - Load shed prevents overload, circuit breaker handles failures
fn main() {
// LoadShed vs other patterns:
// - ConcurrencyLimit without LoadShed: queues requests
// - LoadShed + ConcurrencyLimit: rejects at capacity
// - CircuitBreaker: rejects when error rate is high
// - RateLimit: rejects when request rate is high
// They serve different purposes:
// - RateLimit: protect against external traffic spikes
// - LoadShed: protect against internal capacity limits
// - CircuitBreaker: protect against downstream failures
println!("Load shedding complements other resilience patterns");
}Load shedding is one of several complementary resilience patterns.
use tower::{ServiceBuilder, ServiceExt};
use tower::load_shed::LoadShedLayer;
use tower::limit::concurrency::ConcurrencyLimitLayer;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
#[derive(Clone)]
struct Metrics {
requests_total: Arc<AtomicU64>,
requests_shed: Arc<AtomicU64>,
requests_success: Arc<AtomicU64>,
}
impl Metrics {
fn new() -> Self {
Self {
requests_total: Arc::new(AtomicU64::new(0)),
requests_shed: Arc::new(AtomicU64::new(0)),
requests_success: Arc::new(AtomicU64::new(0)),
}
}
fn shed_rate(&self) -> f64 {
let total = self.requests_total.load(Ordering::Relaxed);
let shed = self.requests_shed.load(Ordering::Relaxed);
if total == 0 {
0.0
} else {
shed as f64 / total as f64
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let metrics = Metrics::new();
let backend = tower::service_fn(|_req: String| async move {
tokio::time::sleep(Duration::from_millis(10)).await;
Ok::<_, tower::BoxError>("response".to_string())
});
let mut service = ServiceBuilder::new()
.load_shed()
.concurrency_limit(3)
.service(backend);
// Simulate requests
for i in 0..20 {
let mut svc = ServiceExt::<String>::clone(&mut service);
let metrics = metrics.clone();
tokio::spawn(async move {
metrics.requests_total.fetch_add(1, Ordering::Relaxed);
match svc.ready().await {
Ok(mut svc) => {
match svc.call(format!("request-{}", i)).await {
Ok(_) => {
metrics.requests_success.fetch_add(1, Ordering::Relaxed);
}
Err(_) => {
metrics.requests_shed.fetch_add(1, Ordering::Relaxed);
}
}
}
Err(_) => {
metrics.requests_shed.fetch_add(1, Ordering::Relaxed);
}
}
});
}
// Wait for requests to complete
tokio::time::sleep(Duration::from_millis(200)).await;
println!("Total requests: {}", metrics.requests_total.load(Ordering::Relaxed));
println!("Successful: {}", metrics.requests_success.load(Ordering::Relaxed));
println!("Shed: {}", metrics.requests_shed.load(Ordering::Relaxed));
println!("Shed rate: {:.2}%", metrics.shed_rate() * 100.0);
Ok(())
}Monitoring load shed events helps tune capacity limits and detect issues.
// Load shedding is appropriate when:
// 1. You have bounded capacity
// - Database connections
// - Thread pool size
// - Memory limits
// 2. Your service has SLAs for latency
// - Rejecting fast is better than timing out slow
// - Predictable behavior under load
// 3. Callers can handle rejection
// - Retry logic exists
// - Fallback responses available
// - Graceful degradation possible
// 4. You want to prevent cascading failures
// - Stop requests from piling up
// - Keep the service responsive for accepted requests
// Load shedding might NOT be appropriate when:
// 1. All requests must be processed
// - Financial transactions
// - Critical system events
// 2. Callers cannot handle rejection
// - No retry logic
// - No fallback behavior
// 3. Capacity is hard to predict
// - Variable workloads
// - Shared resources
fn main() {
println!("Use load shedding when fast failure is better than slow failure");
}Load shedding is valuable but requires callers to handle rejection gracefully.
How LoadShed works:
| Step | Action |
|------|--------|
| 1 | Request arrives at LoadShed middleware |
| 2 | LoadShed calls poll_ready() on inner service |
| 3 | If Ready(Ok), request proceeds to inner service |
| 4 | If Pending, LoadShed returns Ready(Err(Overloaded)) |
| 5 | Caller receives immediate error instead of waiting |
Comparison with alternatives:
| Strategy | Behavior | Latency | Memory | |----------|----------|---------|--------| | Queue | Queue requests | Increases under load | Grows with queue | | Load shed | Reject at capacity | Stable for accepted | Bounded | | Adaptive | Adjust capacity | Variable | Bounded |
Best practices:
| Practice | Reason | |----------|--------| | Set capacity based on benchmarks | Too low wastes resources, too high risks overload | | Monitor shed rate | High rate indicates need for more capacity | | Implement caller fallbacks | Rejected requests need handling | | Combine with rate limiting | Defense in depth against traffic spikes | | Layer with circuit breaker | Handle both overload and downstream failures |
Key insight: LoadShed embodies the principle that under overload, failing fast is better than failing slow. Without load shedding, requests queue up in front of an overloaded service: each request waits longer, consuming memory and connections while hoping the service will recover. This creates a death spiral where latency compounds, timeouts fire, retries increase load further, and eventually the service exhausts resources and fails completely. LoadShed cuts this cycle short by rejecting requests that the service cannot handle, returning errors immediately so callers can respond appropriately. The key is that LoadShed preserves the service's responsiveness for the requests it does acceptāthe accepted requests complete quickly because the service isn't drowning in queued work. This fail-fast approach shifts the burden from the service (which would struggle under accumulated requests) to the caller (which must handle rejection), but this is exactly where the burden should be: the caller knows best what fallback strategy to use, whether that's retrying later, serving cached data, or degrading gracefully.