Loading page…
Rust walkthroughs
Loading page…
tower::load_shed::LoadShed middleware handle overload situations compared to simple queueing?tower::load_shed::LoadShed proactively rejects requests when the service is overloaded, failing fast rather than allowing requests to accumulate in queues. Unlike queueing, which buffers incoming requests hoping the service will catch up, load shedding immediately returns an error to the caller when the service cannot keep up. This approach provides backpressure to upstream systems, prevents cascading failures, and maintains predictable latency for accepted requests at the cost of rejecting some traffic. Simple queueing, in contrast, accepts all requests but introduces unbounded latency growth and potential memory exhaustion when the arrival rate exceeds the processing rate.
use std::time::Duration;
use tokio::time::sleep;
// Simulated slow service
async fn slow_service(request: u32) -> String {
sleep(Duration::from_millis(100)).await;
format!("Processed request {}", request)
}
// Without any protection
async fn no_protection_example() {
let requests: Vec<u32> = (0..100).collect();
// All 100 requests arrive almost simultaneously
let start = std::time::Instant::now();
let mut handles = Vec::new();
for req in requests {
handles.push(tokio::spawn(async move {
let result = slow_service(req).await;
println!("Completed: {} after {:?}", result, start.elapsed());
}));
}
for handle in handles {
handle.await.unwrap();
}
// All requests eventually complete, but:
// - Memory: 100 futures queued
// - Latency: Last requests wait ~10 seconds
// - No feedback to caller about overload
}Without protection, overload causes memory growth and unpredictable latency.
use std::collections::VecDeque;
use tokio::sync::mpsc;
use std::time::Duration;
use tokio::time::sleep;
// Simple queue-based service
struct QueuedService {
sender: mpsc::Sender<u32>,
}
impl QueuedService {
fn new(queue_size: usize) -> Self {
let (sender, mut receiver) = mpsc::channel::<u32>(queue_size);
tokio::spawn(async move {
while let Some(request) = receiver.recv().await {
sleep(Duration::from_millis(100)).await;
println!("Processed: {}", request);
}
});
Self { sender }
}
async fn submit(&self, request: u32) -> Result<(), String> {
self.sender.send(request).await
.map_err(|_| "Queue full".to_string())
}
}
async fn queue_example() {
let service = QueuedService::new(10);
// First 10 requests fill the queue
for i in 0..10 {
service.submit(i).await.unwrap();
println!("Queued: {}", i);
}
// 11th request blocks until queue has space
let start = std::time::Instant::now();
service.submit(10).await.unwrap();
println!("Request 10 queued after {:?}", start.elapsed());
// Problems:
// 1. Caller waits for queue space (high latency)
// 2. No clear signal that service is overloaded
// 3. Queue can grow memory usage
// 4. Latency increases as queue builds
}Queueing accepts requests but makes callers wait, increasing latency without clear feedback.
use tower::Service;
use tower::load_shed::LoadShedLayer;
use tower::ServiceBuilder;
use std::pin::Pin;
use std::future::Future;
use std::task::{Context, Poll};
// Simple service that processes requests
#[derive(Clone)]
struct ProcessingService;
impl Service<u32> for ProcessingService {
type Response = String;
type Error = String;
type Future = Pin<Box<dyn Future<Output = Result<String, String>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Always ready to accept (simulated)
Poll::Ready(Ok(()))
}
fn call(&mut self, request: u32) -> Self::Future {
Box::pin(async move {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
Ok(format!("Processed {}", request))
})
}
}
async fn load_shed_example() {
let mut service = ServiceBuilder::new()
.layer(LoadShedLayer::new())
.service(ProcessingService);
// LoadShed checks poll_ready() before accepting
match Service::poll_ready(&mut service, &mut std::task::Context::from_waker(
futures::task::noop_waker_ref()
)) {
Poll::Ready(Ok(())) => {
let result = service.call(1).await;
println!("Result: {:?}", result);
}
Poll::Ready(Err(_)) => {
println!("Service overloaded, request rejected!");
}
Poll::Pending => {
println!("Service not ready, would need to wait");
}
}
}LoadShed rejects requests when the underlying service isn't ready.
use tower::Service;
use tower::load_shed::LoadShedLayer;
use tower::ServiceBuilder;
use std::pin::Pin;
use std::future::Future;
use std::task::{Context, Poll};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
// Service that tracks concurrency and signals overload
#[derive(Clone)]
struct ConcurrencyLimitedService {
in_flight: Arc<AtomicUsize>,
max_concurrency: usize,
}
impl ConcurrencyLimitedService {
fn new(max_concurrency: usize) -> Self {
Self {
in_flight: Arc::new(AtomicUsize::new(0)),
max_concurrency,
}
}
}
impl Service<u32> for ConcurrencyLimitedService {
type Response = String;
type Error = String;
type Future = Pin<Box<dyn Future<Output = Result<String, String>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Check if we can accept more work
let current = self.in_flight.load(Ordering::Relaxed);
if current < self.max_concurrency {
Poll::Ready(Ok(()))
} else {
// Service is at capacity - LoadShed will reject
Poll::Pending
}
}
fn call(&mut self, request: u32) -> Self::Future {
self.in_flight.fetch_add(1, Ordering::Relaxed);
let in_flight = self.in_flight.clone();
Box::pin(async move {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
in_flight.fetch_sub(1, Ordering::Relaxed);
Ok(format!("Processed {}", request))
})
}
}
async fn load_shed_with_capacity() {
let mut service = ServiceBuilder::new()
.layer(LoadShedLayer::new())
.service(ConcurrencyLimitedService::new(5));
// LoadShed behavior:
// 1. Call poll_ready() on inner service
// 2. If Ready(Ok), accept request
// 3. If Pending or Ready(Err), reject with Overloaded error
println!("Sending requests...");
for i in 0..10 {
// Check if service is ready
let waker = futures::task::noop_waker();
let mut cx = Context::from_waker(&waker);
match Service::poll_ready(&mut service, &mut cx) {
Poll::Ready(Ok(())) => {
let result = service.call(i).await;
println!("Request {}: {:?}", i, result);
}
Poll::Ready(Err(e)) => {
println!("Request {} rejected: service overloaded", i);
}
Poll::Pending => {
println!("Request {} rejected: service not ready", i);
}
}
}
}LoadShed checks poll_ready() and rejects when the service signals it cannot accept more work.
use tower::Service;
use tower::load_shed::LoadShedLayer;
use tower::ServiceBuilder;
use std::pin::Pin;
use std::future::Future;
use std::task::{Context, Poll};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
#[derive(Clone)]
struct SlowService {
in_flight: Arc<AtomicUsize>,
}
impl Service<u32> for SlowService {
type Response = String;
type Error = String;
type Future = Pin<Box<dyn Future<Output = Result<String, String>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let current = self.in_flight.load(Ordering::Relaxed);
if current < 3 {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
fn call(&mut self, request: u32) -> Self::Future {
self.in_flight.fetch_add(1, Ordering::Relaxed);
let in_flight = self.in_flight.clone();
Box::pin(async move {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
in_flight.fetch_sub(1, Ordering::Relaxed);
Ok(format!("Processed {}", request))
})
}
}
async fn latency_comparison() {
println!("=== Load Shedding ===");
let mut shed_service = ServiceBuilder::new()
.layer(LoadShedLayer::new())
.service(SlowService {
in_flight: Arc::new(AtomicUsize::new(0)),
});
let start = std::time::Instant::now();
for i in 0..10 {
let waker = futures::task::noop_waker();
let mut cx = Context::from_waker(&waker);
match Service::poll_ready(&mut shed_service, &mut cx) {
Poll::Ready(Ok(())) => {
let _ = shed_service.call(i).await;
println!("Request {} completed at {:?}", i, start.elapsed());
}
_ => {
println!("Request {} REJECTED at {:?}", i, start.elapsed());
}
}
}
// Load shedding: Accepted requests complete quickly
// Rejected requests get immediate feedback
// Predictable latency for accepted requests
println!("\n=== Queueing (simulated) ===");
// Queueing: All requests wait in queue
// Latency grows with queue length
// Last request waits for all previous to complete
}Load shedding provides predictable latency; queueing causes latency to grow with queue depth.
use tower::Service;
use tower::load_shed::LoadShedLayer;
use tower::load_shed::error::Overloaded;
use tower::ServiceBuilder;
use std::pin::Pin;
use std::future::Future;
use std::task::{Context, Poll};
#[derive(Clone)]
struct UnreadyService;
impl Service<u32> for UnreadyService {
type Response = String;
type Error = String;
type Future = Pin<Box<dyn Future<Output = Result<String, String>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Service never ready - simulating overload
Poll::Pending
}
fn call(&mut self, _request: u32) -> Self::Future {
// This won't be called if poll_ready returns Pending
Box::pin(async { Ok("never reached".to_string()) })
}
}
async fn handle_overload_error() {
let mut service = ServiceBuilder::new()
.layer(LoadShedLayer::new())
.service(UnreadyService);
let waker = futures::task::noop_waker();
let mut cx = Context::from_waker(&waker);
match Service::poll_ready(&mut service, &mut cx) {
Poll::Ready(Err(e)) => {
// LoadShed returns Overloaded error
println!("Service overloaded: {:?}", e);
// Caller can:
// 1. Return error to client (fail fast)
// 2. Retry with backoff
// 3. Route to different service
}
_ => unreachable!(),
}
// The Overloaded error type indicates:
// - The service cannot accept more work
// - The request was never started
// - No resources were consumed
}LoadShed returns an Overloaded error that callers can handle explicitly.
use tower::Service;
use tower::ServiceBuilder;
use tower::load_shed::LoadShedLayer;
use tower::limit::concurrency::ConcurrencyLimitLayer;
use tower::buffer::BufferLayer;
use std::pin::Pin;
use std::future::Future;
use std::task::{Context, Poll};
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
#[derive(Clone)]
struct RealService;
impl Service<String> for RealService {
type Response = String;
type Error = String;
type Future = Pin<Box<dyn Future<Output = Result<String, String>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, request: String) -> Self::Future {
Box::pin(async move {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
Ok(format!("Response to: {}", request))
})
}
}
async fn combined_middleware() {
// Layer order matters:
// LoadShed -> ConcurrencyLimit -> Buffer -> Service
let mut service = ServiceBuilder::new()
// Reject if inner service isn't ready
.layer(LoadShedLayer::new())
// Limit concurrent requests
.layer(ConcurrencyLimitLayer::new(10))
// Buffer requests for async processing
.layer(BufferLayer::new(5))
.service(RealService);
// This combination provides:
// 1. Buffer: Small queue for temporary bursts
// 2. ConcurrencyLimit: Hard limit on in-flight requests
// 3. LoadShed: Reject when limit reached (instead of waiting)
let waker = futures::task::noop_waker();
let mut cx = Context::from_waker(&waker);
match Service::poll_ready(&mut service, &mut cx) {
Poll::Ready(Ok(())) => {
let result = service.call("test".to_string()).await;
println!("{:?}", result);
}
Poll::Ready(Err(_)) => {
println!("Overloaded - request rejected");
}
Poll::Pending => {
println!("Service not ready");
}
}
}Layer LoadShed over other middleware to reject before queueing.
// === QUEUEING BEHAVIOR ===
//
// Request arrives → Queue → Wait → Service processes → Response
//
// Pros:
// - All requests eventually processed
// - No dropped requests (if queue doesn't overflow)
//
// Cons:
// - Latency grows unboundedly
// - Memory grows with queue size
// - No feedback about overload
// - Caller waits indefinitely
// - Cascading delays to upstream systems
// === LOAD SHEDDING BEHAVIOR ===
//
// Request arrives → Check capacity →
// If capacity available: Process → Response
// If at capacity: Immediately reject → Error
//
// Pros:
// - Predictable latency for accepted requests
// - Immediate feedback to caller
// - Bounded memory usage
// - Prevents cascading failures
// - Clear signal for autoscaling
//
// Cons:
// - Some requests rejected
// - Requires caller to handle errors
// - May need retry logic
async fn comparison_summary() {
// Queueing: Accept everything, degrade gracefully
// Load Shedding: Reject when overloaded, maintain quality
// Use queueing when:
// - All requests must be processed eventually
// - Bursty traffic with recovery periods
// - Can tolerate variable latency
// Use load shedding when:
// - Predictable latency is critical
// - Memory bounded
// - Callers can retry or fail gracefully
// - Preventing cascading failures matters
}Queueing optimizes for throughput; load shedding optimizes for latency stability.
use tower::Service;
use tower::ServiceBuilder;
use tower::load_shed::LoadShedLayer;
use tower::limit::concurrency::ConcurrencyLimitLayer;
use std::pin::Pin;
use std::future::Future;
use std::task::{Context, Poll};
#[derive(Clone)]
struct WebService;
impl Service<HttpRequest> for WebService {
type Response = HttpResponse;
type Error = String;
type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, String>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, request: HttpRequest) -> Self::Future {
Box::pin(async move {
// Simulate processing
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
Ok(HttpResponse { status: 200 })
})
}
}
struct HttpRequest;
struct HttpResponse {
status: u16,
}
async fn web_service_with_load_shed() {
let mut service = ServiceBuilder::new()
.layer(LoadShedLayer::new())
.layer(ConcurrencyLimitLayer::new(100))
.service(WebService);
// Typical web service setup:
// - ConcurrencyLimit caps in-flight requests at 100
// - LoadShed rejects request 101+ immediately
// - Clients receive 503 Service Unavailable
// - Latency for accepted requests stays stable
let waker = futures::task::noop_waker();
let mut cx = Context::from_waker(&waker);
match Service::poll_ready(&mut service, &mut cx) {
Poll::Ready(Ok(())) => {
let response = service.call(HttpRequest).await.unwrap();
println!("Status: {}", response.status);
}
Poll::Ready(Err(_)) => {
// Return 503 Service Unavailable
println!("Status: 503 Service Unavailable");
}
Poll::Pending => {
// Would wait, but LoadShed converts Pending to error
println!("Status: 503 Service Unavailable");
}
}
}For web services, combine LoadShed with ConcurrencyLimit for effective overload protection.
use tower::Service;
use tower::load_shed::LoadShedLayer;
use tower::ServiceBuilder;
use std::pin::Pin;
use std::future::Future;
use std::task::{Context, Poll};
async fn backpressure_propagation() {
// Load shedding creates backpressure:
//
// Client → Load Balancer → Service (with LoadShed)
// ↓
// Overloaded? Reject
//
// When service rejects:
// 1. Load balancer sees errors
// 2. Can route to different instance
// 3. Can reduce request rate
// 4. Autoscaler can add capacity
// Queueing hides overload:
//
// Client → Queue → Service
// ↓
// Delays grow silently
// Memory grows
// No signal to upstream
// Cascading timeouts
// Load shedding provides visibility:
// - Error rate directly indicates overload
// - Easy to monitor and alert
// - Clear signal for scaling decisions
}Load shedding provides visible feedback; queueing hides problems until failure.
| Aspect | Load Shedding | Queueing | |--------|--------------|----------| | Request handling | Reject when full | Buffer when full | | Latency | Predictable for accepted | Grows with queue | | Memory | Bounded | Can grow unbounded | | Feedback | Immediate error | Silent degradation | | Caller behavior | Must handle rejection | Waits indefinitely | | Cascading failures | Prevented | Can cascade | | Monitoring | Error rate shows overload | Hidden until timeout | | Use case | Latency-critical, bounded resources | Throughput-critical, can wait |
tower::load_shed::LoadShed and queueing represent fundamentally different approaches to overload:
LoadShed Strategy:
poll_ready() before accepting requestsOverloaded error when service cannot accept more workQueueing Strategy:
Key insight: Load shedding trades acceptance rate for latency stability. By rejecting requests proactively, it protects both the service (from resource exhaustion) and the caller (from unpredictable delays). This fail-fast approach integrates well with distributed systems patterns like circuit breakers, retry with backoff, and autoscaling—all of which rely on visible error signals rather than silently growing queues. Use load shedding when predictable behavior matters more than maximum throughput; use queueing when every request must eventually be processed and latency variance is acceptable.