Loading page…
Rust walkthroughs
Loading page…
tower::buffer::BufferLayer and how does it provide backpressure in service pipelines?tower::buffer::BufferLayer wraps a service with a bounded buffer that queues incoming requests when the underlying service is busy, providing backpressure by limiting the number of in-flight requests and rejecting requests when the buffer is full. The buffer decouples request arrival rate from processing rate, allowing bursts of requests to be queued rather than immediately failing or overwhelming the service. When the buffer reaches capacity, the Service::call method returns a BufferFull error, signaling to the caller that the system is overloaded and should apply its own backpressure strategy—retry, shed load, or propagate the pressure upstream.
use tower::{Service, ServiceBuilder};
use tower::buffer::BufferLayer;
use tower::service_fn;
use std::time::Duration;
use tokio::time::sleep;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create a service that processes requests slowly
let slow_service = service_fn(|request: String| async move {
sleep(Duration::from_millis(100)).await;
Ok::<_, String>(format!("Processed: {}", request))
});
// Wrap with a buffer of 10 requests
let buffered = ServiceBuilder::new()
.layer(BufferLayer::new(10))
.service(slow_service);
// The buffer allows up to 10 requests to queue
// Additional requests will be rejected
Ok(())
}BufferLayer::new(10) creates a buffer that can hold up to 10 pending requests.
use tower::{Service, ServiceBuilder};
use tower::buffer::BufferLayer;
use tower::service_fn;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Service that processes one request at a time (slowly)
let service = service_fn(|req: String| async move {
sleep(Duration::from_millis(50)).await;
Ok::<_, String>(format!("Done: {}", req))
});
// Buffer with capacity 3
let mut buffered = ServiceBuilder::new()
.layer(BufferLayer::new(3))
.service(service);
// How backpressure works:
// 1. First request: starts processing immediately
// 2. Next 3 requests: queued in buffer
// 3. Request 5+: rejected with BufferFull error
// Call service - buffer manages request flow
let response = buffered.call("request 1".to_string()).await?;
println!("Response: {}", response);
Ok(())
}The buffer provides backpressure by rejecting requests when capacity is exceeded.
use tower::{Service, ServiceBuilder};
use tower::buffer::{BufferLayer, error::BufferFull};
use tower::service_fn;
use std::time::Duration;
use tokio::time::sleep;
#[tokio::main]
async fn main() {
let service = service_fn(|_req: ()| async {
sleep(Duration::from_millis(100)).await;
Ok::<_, ()>("done")
});
let mut buffered = ServiceBuilder::new()
.layer(BufferLayer::new(2)) // Very small buffer
.service(service);
// Fill the buffer
let _fut1 = buffered.call(()); // Processing
let _fut2 = buffered.call(()); // Queued (slot 1)
let _fut3 = buffered.call(()); // Queued (slot 2)
// Buffer is now full
// Next call will fail immediately
match buffered.call(()).await {
Ok(response) => println!("Got response: {}", response),
Err(_) => println!("Buffer full! Request rejected"),
}
}When the buffer is full, call returns an error immediately rather than blocking.
use tower::{Service, ServiceBuilder};
use tower::buffer::BufferLayer;
use tower::limit::concurrency::ConcurrencyLimitLayer;
use tower::service_fn;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let service = service_fn(|req: String| async move {
// Simulated work
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
Ok::<_, String>(format!("Handled: {}", req))
});
// Combine buffer with concurrency limit
let mut layered = ServiceBuilder::new()
// First: limit concurrent requests to 5
.layer(ConcurrencyLimitLayer::new(5))
// Then: buffer up to 20 incoming requests
.layer(BufferLayer::new(20))
.service(service);
// Request flow:
// 1. Request arrives
// 2. BufferLayer queues if concurrency limit is full
// 3. ConcurrencyLimit allows max 5 concurrent
// 4. When one completes, next from buffer starts
// This provides two-level backpressure:
// - ConcurrencyLimit: limits actual work
// - BufferLayer: absorbs bursts
Ok(())
}BufferLayer and ConcurrencyLimitLayer work together for layered backpressure.
use tower::{Service, ServiceBuilder};
use tower::buffer::BufferLayer;
use tower::service_fn;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let in_flight = Arc::new(AtomicUsize::new(0));
let in_flight_clone = Arc::clone(&in_flight);
let service = service_fn(move |_req: ()| {
let counter = Arc::clone(&in_flight_clone);
async move {
// Increment in-flight count
let current = counter.fetch_add(1, Ordering::SeqCst) + 1;
println!("In-flight: {}", current);
// Simulate work
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
// Decrement
counter.fetch_sub(1, Ordering::SeqCst);
Ok::<_, ()>(current)
}
});
// Buffer of 5 + 1 processing slot
let mut buffered = ServiceBuilder::new()
.layer(BufferLayer::new(5))
.service(service);
// The buffer acts as a semaphore:
// - Up to N requests can be "in the buffer" waiting
// - When processing slot opens, buffer releases next request
// - This prevents unlimited request accumulation
Ok(())
}The buffer effectively acts as a bounded queue/semaphore for requests.
use tower::{Service, ServiceBuilder};
use tower::buffer::BufferLayer;
use tower::service_fn;
use std::time::Duration;
use tokio::time::{sleep, interval};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Service that processes at fixed rate (1 per 100ms)
let slow_processor = service_fn(|req: u32| async move {
sleep(Duration::from_millis(100)).await;
Ok::<_, ()>(format!("Processed request {}", req))
});
let mut buffered = ServiceBuilder::new()
.layer(BufferLayer::new(10))
.service(slow_processor);
// Requests arrive in bursts
// Without buffer: burst would overwhelm service
// With buffer: requests queue up, service processes at its pace
// Burst of 15 requests arriving at once
let mut tasks = Vec::new();
for i in 0..15 {
let mut svc = buffered.clone();
let handle = tokio::spawn(async move {
match svc.call(i).await {
Ok(response) => println!("Request {}: {}", i, response),
Err(_) => println!("Request {}: Buffer full, rejected", i),
}
});
tasks.push(handle);
}
// First 10 requests queued, next 5 rejected
// Service processes 10 requests at its own pace
for task in tasks {
let _ = task.await;
}
Ok(())
}The buffer smooths out request bursts by decoupling arrival from processing.
use tower::{Service, ServiceBuilder};
use tower::buffer::BufferLayer;
use tower::service_fn;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// BufferLayer requires the inner service to be Clone
// This is because multiple workers share the buffer
let service = service_fn(|req: String| async move {
Ok::<_, String>(format!("Handled: {}", req))
});
// BufferLayer makes the service Clone
let buffered = ServiceBuilder::new()
.layer(BufferLayer::new(100))
.service(service);
// Can now clone for use across multiple tasks
let mut svc1 = buffered.clone();
let mut svc2 = buffered.clone();
// Both share the same underlying buffer
let handle1 = tokio::spawn(async move {
svc1.call("request 1".to_string()).await
});
let handle2 = tokio::spawn(async move {
svc2.call("request 2".to_string()).await
});
// Both requests go through the shared buffer
let _ = handle1.await;
let _ = handle2.await;
Ok(())
}BufferLayer makes a service Clone, enabling shared use across tasks.
use tower::{Service, ServiceBuilder};
use tower::buffer::BufferLayer;
use tower::load_shed::LoadShedLayer;
use tower::service_fn;
#[tokio::main]
async fn main() {
// BufferLayer: queue requests when service is busy
// - Up to N requests queued
// - (N+1)th request rejected
// - Good for handling temporary bursts
// LoadShedLayer: reject immediately when overloaded
// - No queue
// - Immediately reject if can't handle
// - Good for load shedding without queue
let service = service_fn(|req: ()| async {
Ok::<_, ()>("done")
});
// With BufferLayer
let with_buffer = ServiceBuilder::new()
.layer(BufferLayer::new(10))
.service(service.clone());
// Requests queue up to 10, then rejected
// With LoadShed
let with_loadshed = ServiceBuilder::new()
.layer(LoadShedLayer::new())
.service(service);
// Requests rejected immediately if overloaded
// Combined: buffer absorbs bursts, loadshed fails fast
// (LoadShedLayer has different semantics - see docs)
}BufferLayer queues requests; LoadShedLayer rejects immediately when overloaded.
use tower::{Service, ServiceBuilder};
use tower::buffer::{BufferLayer, error::BufferFull};
use tower::service_fn;
use std::time::Duration;
#[tokio::main]
async fn main() {
let service = service_fn(|_req: ()| async {
tokio::time::sleep(Duration::from_millis(100)).await;
Ok::<_, ()>("done")
});
let mut buffered = ServiceBuilder::new()
.layer(BufferLayer::new(2))
.service(service);
// Fill the buffer
let _fut1 = buffered.call(());
let _fut2 = buffered.call(()); // Buffer now full
// Handle BufferFull error
match buffered.call(()).await {
Ok(response) => println!("Success: {:?}", response),
Err(BufferFull { .. }) => {
println!("Buffer is full - apply backpressure!");
// Strategies:
// 1. Return error to client (503 Service Unavailable)
// 2. Retry after delay
// 3. Shed load / circuit break
}
}
}Handle BufferFull errors to implement appropriate backpressure responses.
use tower::{Service, ServiceBuilder};
use tower::buffer::BufferLayer;
use tower::limit::concurrency::ConcurrencyLimitLayer;
use tower::timeout::TimeoutLayer;
use std::time::Duration;
// Conceptual HTTP server pipeline
fn create_service() -> impl Service<String, Response = String, Error = Box<dyn std::error::Error + Send + Sync>> {
ServiceBuilder::new()
// Layer 4 (outermost): Buffer incoming requests
.layer(BufferLayer::new(100))
// Layer 3: Timeout long-running requests
.layer(TimeoutLayer::new(Duration::from_secs(30)))
// Layer 2: Limit concurrent processing
.layer(ConcurrencyLimitLayer::new(10))
// Layer 1 (innermost): Actual handler
.service(tower::service_fn(|req: String| async move {
Ok::<_, Box<dyn std::error::Error + Send + Sync>>(format!("Handled: {}", req))
}))
}
// Request flow:
// 1. Request arrives -> BufferLayer queues if needed
// 2. If queue full -> BufferFull error (fast fail)
// 3. If queued -> waits for concurrency slot
// 4. ConcurrencyLimit allows processing
// 5. Timeout enforced during processing
// 6. Handler processes requestBufferLayer is typically placed at the outer edge of service pipelines.
use tower::{Service, ServiceBuilder};
use tower::buffer::BufferLayer;
use tower::service_fn;
use std::sync::Arc;
use tokio::sync::mpsc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// BufferLayer enables worker pool patterns
// Multiple workers share a single queue
let (tx, mut rx) = mpsc::channel::<String>(100);
// Spawn workers that process from shared queue
for worker_id in 0..4 {
let mut rx = rx.clone();
tokio::spawn(async move {
while let Some(request) = rx.recv().await {
println!("Worker {} processing: {}", worker_id, request);
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
});
}
// BufferLayer does similar thing internally:
// - Creates a shared queue (mpsc channel)
// - Workers pull from queue
// - Bound provides backpressure
let service = service_fn(|req: String| async move {
Ok::<_, ()>(format!("Done: {}", req))
});
let buffered = ServiceBuilder::new()
.layer(BufferLayer::new(100)) // Queue size
.service(service);
// BufferLayer uses a similar pattern internally
// with mpsc channels for request queuing
Ok(())
}BufferLayer uses an mpsc channel internally for request queuing.
use tower::{Service, ServiceBuilder};
use tower::buffer::BufferLayer;
use tower::service_fn;
#[tokio::main]
async fn main() {
// BufferLayer doesn't directly expose metrics
// But you can wrap it with a metrics layer
let service = service_fn(|req: String| async move {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
Ok::<_, ()>(format!("Handled: {}", req))
});
// For monitoring, use tower-metrics or similar
// The buffer capacity is fixed, so track:
// - Number of BufferFull errors
// - Request latency (increases as buffer fills)
// - Queue wait time
// Pattern: custom middleware to track buffer pressure
let buffered = ServiceBuilder::new()
.layer(BufferLayer::new(100))
.service(service);
// Monitor via:
// - BufferFull error count
// - Request latency histogram
// - Downstream service utilization
}Monitor buffer pressure through error rates and latency metrics.
BufferLayer purpose:
Backpressure mechanism:
call returns BufferFull errorInternal architecture:
call sends to channel, waits for responsePosition in pipeline:
Buffer vs alternatives:
When to use:
When NOT to use:
Key insight: BufferLayer provides a bounded buffer that turns unbounded request arrival into bounded queue depth, enabling backpressure propagation. The key design is that it returns errors immediately when the buffer is full rather than blocking—this forces the caller to handle overload explicitly. Combined with ConcurrencyLimitLayer, you get two-level protection: the buffer absorbs temporary bursts, while the concurrency limit prevents resource exhaustion. The buffer also solves a practical problem: it makes any service Clone by wrapping it in a shared handle, which is essential for multi-worker setups where each worker needs its own service handle but they should share a common queue. This is why BufferLayer often appears at the outer edge of service stacks in production systems.