Loading pageā¦
Rust walkthroughs
Loading pageā¦
tower::buffer::Buffer and ServiceBuilder for middleware ordering?tower::buffer::Buffer provides concurrency control and bounded buffering for a service, executing requests from a queue in FIFO order when the service is ready, while ServiceBuilder is a middleware composition utility that constructs layered services in the order they are defined. Buffer affects when requests execute (queuing them until the inner service is ready), whereas ServiceBuilder affects how middleware wraps the inner service (determining the execution order of middleware layers). They serve different purposes: Buffer manages backpressure and concurrent request handling, while ServiceBuilder manages middleware composition and ordering. Both can be combinedāBuffer as a layer within a ServiceBuilder pipeline.
use tower::{ServiceBuilder, Service, ServiceExt};
use tower::layer::Layer;
use std::task::{Context, Poll};
// Example middleware layers
struct LogLayer;
struct AuthLayer;
struct RateLimitLayer;
impl<S> Layer<S> for LogLayer {
type Service = LogService<S>;
fn layer(&self, inner: S) -> Self::Service {
LogService { inner }
}
}
struct LogService<S> {
inner: S,
}
impl<S, Request> Service<Request> for LogService<S>
where
S: Service<Request>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Request) -> Self::Future {
println!("Logging request");
self.inner.call(req)
}
}
fn basic_service_builder() {
// ServiceBuilder layers middleware in order
let service = ServiceBuilder::new()
.layer(LogLayer) // Applied first (outermost)
.layer(AuthLayer) // Applied second
.layer(RateLimitLayer) // Applied third (innermost)
.service(inner_service);
// Request flow: LogLayer -> AuthLayer -> RateLimitLayer -> inner_service
// Response flow: inner_service -> RateLimitLayer -> AuthLayer -> LogLayer
}ServiceBuilder layers wrap from outer to inner in definition order.
use tower::buffer::Buffer;
use tower::{Service, ServiceExt};
use std::sync::Arc;
fn basic_buffer() {
// Buffer wraps a service with a bounded queue
let inner_service = MyService::new();
// Create a buffer with capacity of 100 requests
let buffered = Buffer::new(inner_service, 100);
// Buffer provides:
// 1. Bounded queue for incoming requests
// 2. Backpressure through poll_ready
// 3. Concurrency control
// When poll_ready returns Pending:
// - Requests queue in the buffer
// - When service becomes ready, queued requests execute in FIFO order
}Buffer queues requests when the inner service isn't ready, executing them in order.
use tower::{ServiceBuilder, Service, ServiceExt};
use tower::layer::Layer;
fn ordering_example() {
// Order of .layer() calls determines wrapping order
let service = ServiceBuilder::new()
.layer(LayerA) // Outer layer (wraps everything)
.layer(LayerB) // Middle layer
.layer(LayerC) // Inner layer (closest to inner service)
.service(inner);
// Execution order for requests:
// Request -> LayerA -> LayerB -> LayerC -> inner
//
// Execution order for responses:
// Response <- LayerA <- LayerB <- LayerC <- inner
// LayerA sees the request first
// LayerC sees the request last (before inner service)
}Layers defined earlier wrap layers defined laterāfirst is outermost.
use tower::{ServiceBuilder, Service, ServiceExt};
use tower::buffer::BufferLayer;
fn buffer_in_service_builder() {
// Buffer can be used as a layer within ServiceBuilder
let service = ServiceBuilder::new()
.layer(LogLayer)
.layer(BufferLayer::new(100)) // Buffer with capacity 100
.layer(AuthLayer)
.service(inner_service);
// Request flow:
// 1. LogLayer processes request
// 2. Buffer queues if inner not ready
// 3. AuthLayer processes request
// 4. Inner service handles request
// The buffer's position affects where queuing occurs:
// - Before AuthLayer: requests queue before auth
// - After AuthLayer: auth happens, then queuing
}BufferLayer integrates buffering into the middleware chain at a specific position.
use tower::{ServiceBuilder, Service};
use tower::buffer::Buffer;
fn ordering_control() {
// ServiceBuilder: Explicit ordering control
// You define the exact order of all middleware
let explicit_order = ServiceBuilder::new()
.layer(Middleware1)
.layer(Middleware2)
.layer(Middleware3)
.service(inner);
// Order is clear from code: 1 -> 2 -> 3 -> inner
// Buffer: Ordering is about request execution order
// Requests are queued and executed in FIFO order
let buffered = Buffer::new(inner, 100);
// No middleware ordering involved
// Just: queue -> inner (in order received)
// ServiceBuilder controls MIDDLEWARE order
// Buffer controls REQUEST EXECUTION order (when multiple requests queue)
}ServiceBuilder controls middleware execution order; Buffer controls request execution order.
use tower::{ServiceBuilder, Service, ServiceExt};
use tower::buffer::Buffer;
fn concurrency_model() {
// ServiceBuilder: No built-in concurrency control
// Layers execute synchronously in order
let layered = ServiceBuilder::new()
.layer(LogLayer)
.layer(AuthLayer)
.service(inner);
// poll_ready on layered service:
// - Calls poll_ready on each layer in order
// - If any layer isn't ready, whole service isn't ready
// Buffer: Built-in concurrency control
let buffered = Buffer::new(inner, 100);
// Buffer behavior:
// - poll_ready on buffer: checks if queue has room
// - Multiple callers can send requests
// - Requests queue when inner isn't ready
// - Inner processes one at a time in FIFO order
// Use Buffer when:
// - Multiple producers sending requests
// - Need backpressure
// - Service may not always be ready
}Buffer provides concurrency primitives; ServiceBuilder composes middleware without them.
use tower::{Service, ServiceExt};
use tower::buffer::Buffer;
async fn backpressure_example() {
// Without Buffer: backpressure propagates directly
let mut service = MyService::new();
// If service.poll_ready() returns Pending:
// - Caller must wait
// - No queuing happens
// With Buffer: backpressure is managed
let mut buffered = Buffer::new(MyService::new(), 100);
// If inner service.poll_ready() returns Pending:
// - Buffer's poll_ready may still return Ready (if queue has room)
// - Requests queue in buffer
// - When inner becomes ready, queued requests execute
// Buffer decouples:
// - Caller readiness (buffer capacity)
// - Inner service readiness (actual processing)
}Buffer decouples caller readiness from inner service readiness through queuing.
use tower::{ServiceBuilder, Service, ServiceExt};
use tower::layer::Layer;
fn composability() {
// ServiceBuilder can be reused and composed
let common_layers = ServiceBuilder::new()
.layer(LogLayer)
.layer(MetricsLayer);
// Apply to different services
let service_a = common_layers.clone()
.layer(AuthLayer)
.service(service_a_inner);
let service_b = common_layers.clone()
.layer(RateLimitLayer)
.service(service_b_inner);
// ServiceBuilder also supports conditional layers
let conditional = ServiceBuilder::new()
.layer_when(|config| config.logging, LogLayer)
.layer(AuthLayer)
.service(inner);
// Buffer doesn't compose this way
// It's a single wrapper around one service
}ServiceBuilder enables reusable middleware configurations; Buffer is a single-purpose wrapper.
use tower::buffer::Buffer;
use tower::{Service, ServiceExt};
use std::sync::Arc;
fn concurrent_system() {
// Scenario: Multiple tasks sending requests to one service
// Without Buffer:
// - Each task must wait for service to be ready
// - No queuing between tasks
// - Potential for contention and poor utilization
// With Buffer:
// - Shared buffer allows multiple tasks to queue requests
// - Inner service processes from queue
// - Backpressure when queue is full
let service = Arc::new(MyService::new());
// Buffer allows shared access
let buffered = Buffer::new(service, 100);
// Clone buffer for each task (cheap: just increments Arc count)
let buffer_clone1 = buffered.clone();
let buffer_clone2 = buffered.clone();
// Both tasks can send requests
// Requests queue in the shared buffer
// Inner service processes in order
}Buffer enables multiple producers to share a service with ordered execution.
use tower::{ServiceBuilder, Service, ServiceExt};
use tower::buffer::BufferLayer;
use tower::limit::concurrency::ConcurrencyLimitLayer;
fn combined_approach() {
// Common pattern: use both together
let service = ServiceBuilder::new()
// Outer layers: applied to every request
.layer(LogLayer)
.layer(MetricsLayer)
// Buffer layer: queue requests before expensive layers
.layer(BufferLayer::new(100))
// Concurrency limit: limit concurrent operations
.layer(ConcurrencyLimitLayer::new(10))
// Inner layers: applied after queuing
.layer(AuthLayer)
.layer(RateLimitLayer)
.service(inner_service);
// Request flow:
// 1. Log request
// 2. Record metrics
// 3. Queue in buffer (if inner busy)
// 4. Concurrency limit
// 5. Auth check
// 6. Rate limit check
// 7. Inner service
// Buffer position matters:
// - Before auth: unauthenticated requests queue
// - After auth: only authenticated requests queue
}BufferLayer integrates buffering into ServiceBuilder pipelines at specific positions.
use tower::buffer::Buffer;
use tower::{ServiceBuilder, Service};
fn performance_comparison() {
// ServiceBuilder: Zero-cost abstraction
// - Layers compose at compile time
// - No runtime allocation for middleware
// - Inlined calls between layers
let layered = ServiceBuilder::new()
.layer(LogLayer)
.layer(AuthLayer)
.service(inner);
// Compile-time composition, no heap allocation
// Buffer: Runtime overhead
// - Queue allocation for pending requests
// - Arc for shared state
// - Channel for communication
let buffered = Buffer::new(inner, 100);
// Allocates: queue, Arc, channel
// Use ServiceBuilder when:
// - You need middleware composition
// - Zero overhead is important
// - No concurrency needed
// Use Buffer when:
// - Multiple callers share a service
// - Backpressure is needed
// - Queuing is beneficial
}ServiceBuilder is zero-cost composition; Buffer has runtime overhead for queuing.
use tower::buffer::Buffer;
use tower::{Service, ServiceExt};
fn cloneable_services() {
// Problem: Services often aren't Clone
// But you need multiple handles to the same service
// Buffer makes the service cloneable:
// The buffer handle is cheap to clone
// All clones share the same underlying queue
let inner = MyService::new(); // Doesn't implement Clone
let buffered = Buffer::new(inner, 100);
// Now buffered implements Clone
let handle1 = buffered.clone();
let handle2 = buffered.clone();
// Both handles send to same queue
// Inner service processes from single queue
// ServiceBuilder doesn't provide this
// It wraps layers, doesn't make services cloneable
}Buffer makes non-clone services cloneable through shared state; ServiceBuilder doesn't.
use tower::buffer::Buffer;
use tower::{Service, ServiceExt};
use tower::ServiceBuilder;
fn error_handling() {
// ServiceBuilder: Errors propagate through layers
// Each layer can transform or add context to errors
let layered = ServiceBuilder::new()
.layer(LogLayer) // May log errors
.layer(AuthLayer) // May produce auth errors
.layer(RateLimitLayer) // May produce rate limit errors
.service(inner);
// Errors flow: inner -> RateLimit -> Auth -> Log
// Buffer: Two types of errors
// 1. Errors from inner service
// 2. Errors from buffer itself (e.g., service closed)
let buffered = Buffer::new(inner, 100);
// Buffer may return errors like:
// - "service closed" if inner service was dropped
// - Inner service errors pass through
// Buffer doesn't transform errors, just propagates them
}ServiceBuilder layers can transform errors; Buffer propagates or adds service-closed errors.
use tower::buffer::Buffer;
use tower::{ServiceBuilder, Service, ServiceExt};
fn usage_guidelines() {
// Use ServiceBuilder when:
// 1. Composing multiple middleware layers
// 2. Need explicit ordering control
// 3. Building reusable middleware pipelines
// 4. Want zero-cost abstraction
let with_middleware = ServiceBuilder::new()
.layer(LogLayer)
.layer(AuthLayer)
.layer(CacheLayer)
.service(inner);
// Use Buffer when:
// 1. Multiple tasks share one service
// 2. Need backpressure for overload protection
// 3. Service may have variable readiness
// 4. Need ordered request processing
let with_backpressure = Buffer::new(inner, 100);
// Use both when:
// 1. You have middleware AND need concurrency control
// 2. Buffer position within middleware matters
let combined = ServiceBuilder::new()
.layer(LogLayer)
.layer(BufferLayer::new(100)) // Position matters
.layer(AuthLayer)
.service(inner);
}Use ServiceBuilder for composition, Buffer for concurrency control, or both together.
use tower::{ServiceBuilder, ServiceExt};
use tower::buffer::BufferLayer;
use tower::limit::concurrency::ConcurrencyLimitLayer;
fn pitfalls() {
// Pitfall 1: Buffer before auth
// Unauthenticated requests queue up, wasting resources
let bad1 = ServiceBuilder::new()
.layer(BufferLayer::new(100)) // Queues unauthenticated!
.layer(AuthLayer) // Auth happens after queuing
.service(inner);
// Better: Auth before buffer
let better1 = ServiceBuilder::new()
.layer(AuthLayer) // Auth happens immediately
.layer(BufferLayer::new(100)) // Only queue authenticated
.service(inner);
// Pitfall 2: Multiple buffers in chain
// Unnecessary overhead, confusing ordering
let bad2 = ServiceBuilder::new()
.layer(BufferLayer::new(100))
.layer(LogLayer)
.layer(BufferLayer::new(100)) // Second buffer unnecessary
.service(inner);
// Pitfall 3: Buffer after rate limiting
// Rate-limited requests queue instead of being rejected
let bad3 = ServiceBuilder::new()
.layer(BufferLayer::new(100)) // Queues rate-limited requests
.layer(RateLimitLayer) // Rate limit checked after queue
.service(inner);
// Better: Rate limit before buffer
let better3 = ServiceBuilder::new()
.layer(RateLimitLayer) // Reject immediately if over limit
.layer(BufferLayer::new(100)) // Queue only allowed requests
.service(inner);
}Buffer position within ServiceBuilder affects when queuing happens relative to other middleware.
Purpose comparison:
// ServiceBuilder: Middleware composition
// - Defines order of layers
// - Zero-cost abstraction
// - Compile-time composition
let layered = ServiceBuilder::new()
.layer(Layer1)
.layer(Layer2)
.service(inner);
// Buffer: Concurrency control
// - Queues requests
// - Provides backpressure
// - Enables shared access
let buffered = Buffer::new(inner, capacity);Key trade-offs:
| Aspect | ServiceBuilder | Buffer | |--------|---------------|--------| | Purpose | Middleware composition | Concurrency control | | Ordering | Middleware execution order | Request execution order | | Overhead | Zero-cost (compile-time) | Runtime (queue, Arc) | | Backpressure | Propagates through layers | Manages with queue | | Cloneability | Depends on inner service | Makes service cloneable | | Reusability | Layers reusable across services | Single service wrapper |
Common patterns:
// Pattern 1: ServiceBuilder for middleware only
let service = ServiceBuilder::new()
.layer(LogLayer)
.layer(AuthLayer)
.layer(CacheLayer)
.service(inner);
// Pattern 2: Buffer for concurrency only
let service = Buffer::new(inner, 100);
// Pattern 3: Combined with intentional buffer position
let service = ServiceBuilder::new()
.layer(LogLayer) // Log all requests
.layer(RateLimitLayer) // Reject over-limit early
.layer(AuthLayer) // Authenticate before queuing
.layer(BufferLayer::new(100)) // Queue authenticated requests
.layer(ConcurrencyLimitLayer::new(10)) // Limit concurrent work
.service(inner);Key insight: ServiceBuilder and Buffer solve different problemsāServiceBuilder composes middleware with explicit ordering control, while Buffer manages request queuing and concurrency. They're complementary: ServiceBuilder determines what middleware runs and in what order, while Buffer determines when queued requests execute (FIFO order when the inner service is ready). The position of BufferLayer within a ServiceBuilder pipeline significantly affects behaviorāplacing it before auth or rate limiting means unauthenticated or rate-limited requests still queue, while placing it after means only validated requests use queue capacity. For most services, use ServiceBuilder for middleware composition and add BufferLayer when you need backpressure or shared service access.