How does tower::buffer::BufferLayer enable cloning of non-Clone services?
BufferLayer wraps a non-Clone service in a shared buffer with a worker task, allowing multiple handle services to clone themselves and send requests through a channel to the single underlying service. The trick is that the wrapper service doesn't need to clone the inner serviceβit clones a sender to a channel, while the actual service processes requests sequentially from that channel. This pattern is essential for sharing services across multiple request handlers when the service itself cannot be cloned.
The Problem: Services Need to Be Clone
use tower::Service;
use std::task::{Context, Poll};
// Many services have state that can't be cloned
struct DatabaseConnection {
// Connection handles often can't be cloned
connection: SqlConnection,
// Or contain non-Clone types
file_handle: File,
}
// This service CANNOT implement Clone
impl Service<Query> for DatabaseConnection {
type Response = Result;
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// ...
}
fn call(&mut self, req: Query) -> Self::Future {
// ...
}
}
// Problem: Tower middleware often requires services to be Clone
// For example, when spawning multiple request handlers:
// This won't compile if DatabaseConnection isn't Clone:
// let make_service = MakeService::new(DatabaseConnection::connect()?);
// Server::bind(&addr).serve(make_service).await?;
// Error: DatabaseConnection doesn't implement CloneServices used in servers need cloning, but many implementations can't be cloned.
The Buffer Solution
use tower::buffer::{Buffer, BufferLayer};
use tower::ServiceBuilder;
use tower::Service;
// BufferLayer wraps any service to make it "cloneable"
// The pattern:
// 1. Wrap service in Arc<Mutex<S>>
// 2. Spawn a worker task that locks and processes
// 3. Clones share the Arc, sending requests through channel
async fn example() {
// Non-Clone service
let db = DatabaseConnection::connect().await?;
// Wrap with Buffer - now it's Clone!
let buffered: Buffer<DatabaseConnection, Query> = Buffer::new(db, 1024);
// | ^
// | |
// buffer capacity service type
// Now you can clone the buffer service
let handle1 = buffered.clone();
let handle2 = buffered.clone();
let handle3 = buffered.clone();
// All three send requests to the SAME underlying service
// The service processes them one at a time (sequentially)
}Buffer::new creates a clonable wrapper around any service.
What Buffer Actually Does
use tower::buffer::Buffer;
// Conceptual model of how Buffer works:
// Without Buffer:
// Service (non-Clone)
// βββ Cannot be shared across handlers
// With Buffer:
// βββββββββββββββββββββββββββββββββββββββββββββββ
// β βββββββββββββββββββββββ
// β Clone 1 βββββββββββΊ β ββ
// β β Channel (mpsc) ββ
// β Clone 2 βββββββββββΊ β (bounded) ββ
// β β ββ
// β Clone 3 βββββββββββΊ ββββββββββ¬βββββββββββββ
// β β β
// β βΌ β
// β ββββββββββββββββββββββ β
// β β Worker Task β β
// β β (processes β β
// β β requests) β β
// β ββββββββββ¬ββββββββββββ β
// β β β
// β βΌ β
// β βββββββββββββββββββββββββ β
// β β Original Service β β
// β β (single instance) β β
// β βββββββββββββββββββββββββ β
// βββββββββββββββββββββββββββββββββββββββββββββββ
// Each clone is just a channel sender
// The actual service exists exactly onceBuffer creates a channel-based architecture where clones are just senders.
Using BufferLayer in Middleware Chains
use tower::ServiceBuilder;
use tower::buffer::BufferLayer;
use tower::limit::RateLimitLayer;
use tower::timeout::TimeoutLayer;
use std::time::Duration;
async fn example() {
// Non-Clone service
let service = MyService::new();
// BufferLayer in a ServiceBuilder chain
let service = ServiceBuilder::new()
// Buffer must come before middleware that needs Clone
.layer(BufferLayer::new(1024))
// These layers now work even though MyService isn't Clone
.layer(RateLimitLayer::new(100, Duration::from_secs(1)))
.layer(TimeoutLayer::new(Duration::from_secs(30)))
.service(service);
// The resulting service IS Clone
let clone1 = service.clone();
let clone2 = service.clone();
// Both clones send to the same underlying service
}
// BufferLayer vs Buffer::new:
// - BufferLayer::new(capacity) - used in ServiceBuilder
// - Buffer::new(service, capacity) - direct construction
// Both do the same thing internallyBufferLayer integrates with ServiceBuilder for clean middleware composition.
Capacity and Backpressure
use tower::buffer::Buffer;
async fn capacity_example() {
let service = MyService::new();
// Capacity = 1024 pending requests in channel
let buffered = Buffer::new(service, 1024);
// If 1024 requests are pending:
// - send().await will block (backpressure)
// - poll_ready() will return Pending
// This protects the service from being overwhelmed
// Requests queue up to capacity, then backpressure kicks in
// Choosing capacity:
// - Too small: callers block frequently
// - Too large: memory overhead, latency accumulation
// Common values:
// - 1024: typical for web servers
// - 64-256: for low-latency services
// - Depends on expected request rate and processing time
}Capacity determines how many requests can queue before backpressure applies.
Thread Safety and Concurrency
use tower::buffer::Buffer;
use tower::Service;
use std::sync::Arc;
async fn thread_safety() {
let service = MyService::new();
let buffered = Buffer::new(service, 1024);
// Buffer service is Clone + Send + Sync
// Can be shared across threads safely
// Spawn multiple tasks with their own clones
let handles: Vec<_> = (0..10)
.map(|i| {
let mut svc = buffered.clone();
tokio::spawn(async move {
// Each task has its own clone
// All send to same underlying service
let response = svc.call(format!("request {}", i)).await;
println!("Task {} got: {:?}", i, response);
})
})
.collect();
// All tasks share the single underlying service
// Requests are processed sequentially by the service
// Key point: The service doesn't need to be Send or Clone
// Buffer handles that by wrapping in Arc<Mutex>
}Buffer makes non-Send services appear Send by isolating them in a task.
Sequential Processing Semantics
use tower::buffer::Buffer;
use tower::Service;
async fn sequential_processing() {
// Important: Buffer makes clones share a SINGLE service instance
// This means requests are processed sequentially, NOT in parallel
struct SlowService;
impl Service<String> for SlowService {
// ... processes each request slowly ...
}
let slow = SlowService;
let buffered = Buffer::new(slow, 100);
// If we send 10 requests:
// Clone 1: sends requests A, B, C
// Clone 2: sends requests D, E, F
// Clone 3: sends requests G, H, I, J
// The service processes them ONE AT A TIME
// A -> B -> C -> D -> E -> F -> G -> H -> I -> J
// This is important for:
// 1. Services with internal mutable state
// 2. Services that must not be called concurrently
// 3. Services representing sequential resources
// If you need parallel processing, use a different pattern:
// - Clone the service itself (if possible)
// - Use a pool of services (tower::MakeService)
}Buffer provides sequential processing, not parallelism.
When to Use Buffer
use tower::buffer::Buffer;
// Use Buffer when:
// 1. Service cannot implement Clone
struct UniqueConnection {
handle: UniqueHandle, // Not Clone
}
// 2. Service must be shared across multiple handlers
async fn server() {
let svc = UniqueConnection::connect().await;
let buffered = Buffer::new(svc, 1024);
// axum, warp, tonic need Clone services
let app = make_app(buffered.clone());
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
// Server needs to clone service for each connection
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
}
// 3. Service represents a sequential resource
struct SerialPort {
port: Serial, // Must be accessed sequentially
}
// 4. Service has expensive Clone
struct ExpensiveService {
// If Clone would be expensive or impossible
pool: ConnectionPool, // Expensive to clone
}Use Buffer when cloning is impossible or expensive, but sharing is required.
When NOT to Use Buffer
use tower::buffer::Buffer;
// DON'T use Buffer when:
// 1. Service already implements Clone efficiently
struct CheapClone {
data: Arc<Data>, // Arc makes Clone cheap
}
// Just clone it directly, no Buffer needed
// 2. Service can be recreated cheaply
struct StatelessService;
impl Clone for StatelessService {
fn clone(&self) -> Self {
StatelessService // No state, cheap to create
}
}
// Make multiple instances instead of sharing one
// 3. You need parallel processing
// Buffer processes requests sequentially
// For parallel processing, use service pools
// 4. Backpressure would be problematic
// If clients can't wait, Buffer's queuing may cause timeouts
// Alternative: Service pools for parallelism
use tower::MakeService;
// MakeService creates NEW service instances
// Each handler gets its own service (parallel execution)Don't use Buffer when cloning is cheap or when parallelism is needed.
Buffer vs Clone Implementation
use tower::buffer::Buffer;
// Two approaches to making services cloneable:
// Approach 1: Implement Clone yourself
#[derive(Clone)]
struct ClonableService {
inner: Arc<Mutex<InnerService>>,
}
impl Service<Request> for ClonableService {
// Must acquire lock on every call
fn call(&mut self, req: Request) -> Self::Future {
let inner = self.inner.clone();
async move {
let mut guard = inner.lock().await;
guard.call(req).await
}
}
}
// Pros: Simple, no extra types
// Cons: Lock contention, manual implementation
// Approach 2: Use Buffer
let service = InnerService::new();
let buffered = Buffer::new(service, 1024);
// Pros: Built-in, proper backpressure, worker task handles locking
// Cons: Additional type wrapper, channel overhead
// Buffer is generally better because:
// - Handles backpressure with capacity
// - Worker task pattern is cleaner
// - Standard approach, well-testedBuffer provides a robust implementation of the shared-service pattern.
Error Handling and Closures
use tower::buffer::Buffer;
async fn error_handling() {
let service = MyService::new();
let buffered = Buffer::new(service, 1024);
// Buffer handles service errors gracefully
// If the service fails, the error propagates to caller
// If the worker task crashes:
// - Future calls return an error
// - The channel is closed
let mut svc = buffered.clone();
match svc.call(request).await {
Ok(response) => println!("Success: {:?}", response),
Err(e) => {
// Could be:
// - Service error
// - Service panicked
// - Channel closed (service dropped)
}
}
// Important: If you drop all clones, the worker exits
// The original service is dropped too
}Buffer propagates errors and handles worker failures gracefully.
Integration with Tower Ecosystem
use tower::ServiceBuilder;
use tower::buffer::BufferLayer;
use tower::limit::{RateLimitLayer, ConcurrencyLimitLayer};
use tower::timeout::TimeoutLayer;
use tower::retry::RetryLayer;
use tower::load_shed::LoadShedLayer;
use std::time::Duration;
async fn middleware_stack() {
// Buffer plays nicely with other middleware
let service = MyNonCloneService::new();
let service = ServiceBuilder::new()
// LoadShed needs to clone the service
// So Buffer must come first
.layer(BufferLayer::new(1024))
// Now all these can clone the service
.layer(LoadShedLayer::new())
.layer(ConcurrencyLimitLayer::new(100))
.layer(RateLimitLayer::new(1000, Duration::from_secs(1)))
.layer(TimeoutLayer::new(Duration::from_secs(30)))
.layer(RetryLayer::new(RetryPolicy::new(3)))
.service(service);
// Order matters:
// Buffer must be early in the stack (closer to service)
// so outer layers can clone the wrapper
// Request flow:
// Request -> Retry -> Timeout -> RateLimit -> ConcurrencyLimit
// -> LoadShed -> Buffer -> MyService
}Place BufferLayer early in the middleware stack so outer layers can clone.
Synthesis
Quick reference:
use tower::buffer::{Buffer, BufferLayer};
use tower::ServiceBuilder;
// Buffer: Makes non-Clone services cloneable
// Creates a channel-based wrapper where clones are just senders
// Basic usage:
let service = NonCloneService::new();
let buffered = Buffer::new(service, 1024);
// capacity: max pending requests before backpressure
// With ServiceBuilder:
let service = ServiceBuilder::new()
.layer(BufferLayer::new(1024))
.service(NonCloneService::new());
// What Buffer does:
// 1. Wraps service in Arc<Mutex<S>>
// 2. Spawns worker task to process requests
// 3. Clones are channel senders
// 4. All clones share ONE service instance
// Key properties:
// - Clones are cheap (just channel senders)
// - Service processes requests sequentially
// - Backpressure when channel is full
// - Thread-safe (Send + Sync)
// When to use:
// - Service cannot implement Clone
// - Must share service across handlers
// - Sequential processing is acceptable
// - Server frameworks requiring Clone
// When NOT to use:
// - Service already Clone efficiently
// - Parallel processing needed (use pools)
// - Cloning is cheap (just clone)
// Capacity considerations:
// - Small (64-256): Low latency, more backpressure
// - Large (1024+): More queuing, higher throughput potential
// - Choose based on: request rate Γ processing timeKey insight: BufferLayer solves a fundamental tension in service-oriented designβservices often need to be shared across multiple concurrent handlers, but implementing Clone can be expensive or impossible. The solution is to NOT clone the service at all. Instead, Buffer creates a single worker task that owns the service, and distributes "handles" that are just channel senders. Each handle implements Service by sending requests through the channel. This pattern provides sequential processing semantics (which many services require), backpressure through bounded channels, and thread safety without requiring the underlying service to be Send. The cost is indirection through channels and serialization of requests that might otherwise run in parallel.
