Loading page…
Rust walkthroughs
Loading page…
The tower crate provides a modular service abstraction and middleware ecosystem for Rust. It's the foundation for middleware in frameworks like axum, tonic, and warp. Tower's key insight is that both services (handlers) and middleware share the same interface—the Service trait. This enables composing layers of middleware around services, creating powerful processing pipelines. Tower decouples middleware from specific frameworks, making them reusable across any tower-compatible system.
Key concepts:
# Cargo.toml
[dependencies]
tower = "0.4"
tokio = { version = "1", features = ["full"] }
http = "1.0"
http-body-util = "0.1"use tower::{Service, ServiceBuilder, ServiceExt};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
// A simple service that echoes the request body
#[derive(Clone)]
struct EchoService;
impl Service<http::Request<String>> for EchoService {
type Response = http::Response<String>;
type Error = std::convert::Infallible;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: http::Request<String>) -> Self::Future {
Box::pin(async move {
Ok(http::Response::new(req.into_body()))
})
}
}
#[tokio::main]
async fn main() {
let mut service = EchoService;
let request = http::Request::new("Hello, Tower!".to_string());
let response = service.ready().await.unwrap().call(request).await.unwrap();
println!("Response: {}", response.into_body());
}use tower::{service_fn, Service, ServiceExt};
#[tokio::main]
async fn main() {
// Create a service from a closure
let mut service = service_fn(|req: http::Request<String>| async move {
Ok::<_, std::convert::Infallible>(http::Response::new(req.into_body()))
});
let request = http::Request::new("Hello!".to_string());
let response = service.ready().await.unwrap().call(request).await.unwrap();
println!("Response: {}", response.into_body());
}use tower::{Layer, Service, ServiceExt};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
// The middleware layer
#[derive(Clone)]
pub struct LogLayer;
impl<S> Layer<S> for LogLayer {
type Service = LogMiddleware<S>;
fn layer(&self, inner: S) -> Self::Service {
LogMiddleware { inner }
}
}
// The middleware itself
#[derive(Clone)]
pub struct LogMiddleware<S> {
inner: S,
}
impl<S, Request> Service<Request> for LogMiddleware<S>
where
S: Service<Request>,
S::Future: Send + 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Request) -> Self::Future {
println!("[LOG] Processing request");
let future = self.inner.call(req);
Box::pin(async move {
let response = future.await?;
println!("[LOG] Response sent");
Ok(response)
})
}
}
#[tokio::main]
async fn main() {
let inner = tower::service_fn(|_: ()| async {
Ok::<_, std::convert::Infallible>("Hello from service!")
});
let mut service = LogLayer.layer(inner);
let response = service.ready().await.unwrap().call(()).await.unwrap();
println!("Result: {}", response);
}use tower::{Layer, Service, ServiceExt};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
#[derive(Clone)]
struct CounterLayer {
count: Arc<AtomicU64>,
}
impl CounterLayer {
fn new() -> Self {
Self {
count: Arc::new(AtomicU64::new(0)),
}
}
}
impl<S> Layer<S> for CounterLayer {
type Service = CounterMiddleware<S>;
fn layer(&self, inner: S) -> Self::Service {
CounterMiddleware {
inner,
count: self.count.clone(),
}
}
}
#[derive(Clone)]
struct CounterMiddleware<S> {
inner: S,
count: Arc<AtomicU64>,
}
impl<S, Request> Service<Request> for CounterMiddleware<S>
where
S: Service<Request>,
S::Future: Send + 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Request) -> Self::Future {
let count = self.count.fetch_add(1, Ordering::Relaxed) + 1;
println!("Request #{}", count);
Box::pin(self.inner.call(req))
}
}
#[tokio::main]
async fn main() {
let counter = CounterLayer::new();
let inner = tower::service_fn(|_: ()| async {
Ok::<_, std::convert::Infallible>("Done")
});
let mut service = counter.layer(inner);
// Make multiple requests
for _ in 0..3 {
service.ready().await.unwrap().call(()).await.unwrap();
}
}use tower::{Layer, Service, ServiceExt};
use std::time::Instant;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
#[derive(Clone)]
struct TimingLayer;
impl<S> Layer<S> for TimingLayer {
type Service = TimingMiddleware<S>;
fn layer(&self, inner: S) -> Self::Service {
TimingMiddleware { inner }
}
}
#[derive(Clone)]
struct TimingMiddleware<S> {
inner: S,
}
impl<S, Request> Service<Request> for TimingMiddleware<S>
where
S: Service<Request>,
S::Future: Send + 'static,
Request: std::fmt::Debug + Send + 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Request) -> Self::Future {
let start = Instant::now();
println!("[START] Processing: {:?}", req);
let future = self.inner.call(req);
Box::pin(async move {
let response = future.await;
let elapsed = start.elapsed();
println!("[END] Took {:?}", elapsed);
response
})
}
}
#[tokio::main]
async fn main() {
let inner = tower::service_fn(|req: &'static str| async move {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
Ok::<_, std::convert::Infallible>(format!("Processed: {}", req))
});
let mut service = TimingLayer.layer(inner);
let response = service.ready().await.unwrap().call("hello").await.unwrap();
println!("Response: {}", response);
}use tower::{ServiceBuilder, Layer, Service, ServiceExt};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
// Middleware 1: Logging
#[derive(Clone)]
struct LogLayer;
impl<S> Layer<S> for LogLayer {
type Service = LogMiddleware<S>;
fn layer(&self, inner: S) -> Self::Service {
LogMiddleware { inner }
}
}
#[derive(Clone)]
struct LogMiddleware<S> {
inner: S,
}
impl<S, Request> Service<Request> for LogMiddleware<S>
where
S: Service<Request>,
S::Future: Send + 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Request) -> Self::Future {
println!("[LOG] Request received");
Box::pin(self.inner.call(req))
}
}
// Middleware 2: Timing
#[derive(Clone)]
struct TimingLayer;
impl<S> Layer<S> for TimingLayer {
type Service = TimingMiddleware<S>;
fn layer(&self, inner: S) -> Self::Service {
TimingMiddleware { inner }
}
}
#[derive(Clone)]
struct TimingMiddleware<S> {
inner: S,
}
impl<S, Request> Service<Request> for TimingMiddleware<S>
where
S: Service<Request>,
S::Future: Send + 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Request) -> Self::Future {
let start = std::time::Instant::now();
let future = self.inner.call(req);
Box::pin(async move {
let response = future.await;
println!("[TIMING] {:?}", start.elapsed());
response
})
}
}
#[tokio::main]
async fn main() {
let inner = tower::service_fn(|_: ()| async {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
Ok::<_, std::convert::Infallible>("Hello!")
});
// Chain middleware with ServiceBuilder
let mut service = ServiceBuilder::new()
.layer(LogLayer)
.layer(TimingLayer)
.service(inner);
let response = service.ready().await.unwrap().call(()).await.unwrap();
println!("Response: {}", response);
}use tower::{Layer, Service, ServiceExt};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
#[derive(Clone)]
struct UppercaseLayer;
impl<S> Layer<S> for UppercaseLayer {
type Service = UppercaseMiddleware<S>;
fn layer(&self, inner: S) -> Self::Service {
UppercaseMiddleware { inner }
}
}
#[derive(Clone)]
struct UppercaseMiddleware<S> {
inner: S,
}
impl<S> Service<String> for UppercaseMiddleware<S>
where
S: Service<String, Response = String>,
S::Future: Send + 'static,
{
type Response = String;
type Error = S::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: String) -> Self::Future {
// Transform request
let upper_req = req.to_uppercase();
println!("Transformed: {}", upper_req);
let future = self.inner.call(upper_req);
Box::pin(async move {
// Transform response
let response = future.await?;
Ok(format!("[UPPER] {}", response))
})
}
}
#[tokio::main]
async fn main() {
let inner = tower::service_fn(|req: String| async move {
Ok::<_, std::convert::Infallible>(format!("Echo: {}", req))
});
let mut service = UppercaseLayer.layer(inner);
let response = service.ready().await.unwrap().call("hello".to_string()).await.unwrap();
println!("Response: {}", response);
}use tower::{Layer, Service, ServiceExt};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
#[derive(Debug)]
enum MyError {
Service(String),
Timeout,
}
#[derive(Clone)]
struct ErrorHandlingLayer;
impl<S> Layer<S> for ErrorHandlingLayer {
type Service = ErrorHandlingMiddleware<S>;
fn layer(&self, inner: S) -> Self::Service {
ErrorHandlingMiddleware { inner }
}
}
#[derive(Clone)]
struct ErrorHandlingMiddleware<S> {
inner: S,
}
impl<S, Request> Service<Request> for ErrorHandlingMiddleware<S>
where
S: Service<Request, Error = MyError>,
S::Future: Send + 'static,
{
type Response = S::Response;
type Error = String; // Convert errors to strings
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.inner.poll_ready(cx) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(e)) => Poll::Ready(Err(format!("Ready error: {:?}", e))),
Poll::Pending => Poll::Pending,
}
}
fn call(&mut self, req: Request) -> Self::Future {
let future = self.inner.call(req);
Box::pin(async move {
future.await.map_err(|e| format!("Call error: {:?}", e))
})
}
}
#[tokio::main]
async fn main() {
let inner = tower::service_fn(|_: ()| async {
Err::<String, _>(MyError::Service("Something failed".to_string()))
});
let mut service = ErrorHandlingLayer.layer(inner);
match service.ready().await.unwrap().call(()).await {
Ok(response) => println!("Success: {}", response),
Err(e) => println!("Error: {}", e),
}
}use tower::{Layer, Service, ServiceExt};
use std::time::Duration;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
#[derive(Clone)]
struct TimeoutLayer {
duration: Duration,
}
impl TimeoutLayer {
fn new(duration: Duration) -> Self {
Self { duration }
}
}
impl<S> Layer<S> for TimeoutLayer {
type Service = TimeoutMiddleware<S>;
fn layer(&self, inner: S) -> Self::Service {
TimeoutMiddleware {
inner,
duration: self.duration,
}
}
}
#[derive(Clone)]
struct TimeoutMiddleware<S> {
inner: S,
duration: Duration,
}
impl<S, Request> Service<Request> for TimeoutMiddleware<S>
where
S: Service<Request>,
S::Future: Send + 'static,
S::Error: Send + 'static,
{
type Response = S::Response;
type Error = String;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx).map_err(|_| "Ready error".to_string())
}
fn call(&mut self, req: Request) -> Self::Future {
let duration = self.duration;
let future = self.inner.call(req);
Box::pin(async move {
match tokio::time::timeout(duration, future).await {
Ok(result) => result.map_err(|_| "Service error".to_string()),
Err(_) => Err(format!("Timeout after {:?}", duration)),
}
})
}
}
#[tokio::main]
async fn main() {
let inner = tower::service_fn(|_: ()| async {
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
Ok::<_, std::convert::Infallible>("Done")
});
let mut service = TimeoutLayer::new(Duration::from_millis(100)).layer(inner);
match service.ready().await.unwrap().call(()).await {
Ok(response) => println!("Success: {}", response),
Err(e) => println!("Error: {}", e),
}
}use tower::{Layer, Service, ServiceExt};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
#[derive(Clone)]
struct RateLimitLayer {
requests_per_second: u64,
}
impl RateLimitLayer {
fn new(requests_per_second: u64) -> Self {
Self { requests_per_second }
}
}
struct RateLimitState {
count: AtomicU64,
last_reset: std::sync::Mutex<Instant>,
}
impl<S> Layer<S> for RateLimitLayer {
type Service = RateLimitMiddleware<S>;
fn layer(&self, inner: S) -> Self::Service {
RateLimitMiddleware {
inner,
limit: self.requests_per_second,
state: Arc::new(RateLimitState {
count: AtomicU64::new(0),
last_reset: std::sync::Mutex::new(Instant::now()),
}),
}
}
}
#[derive(Clone)]
struct RateLimitMiddleware<S> {
inner: S,
limit: u64,
state: Arc<RateLimitState>,
}
impl<S, Request> Service<Request> for RateLimitMiddleware<S>
where
S: Service<Request>,
S::Future: Send + 'static,
{
type Response = S::Response;
type Error = String;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx).map_err(|_| "Service error".to_string())
}
fn call(&mut self, req: Request) -> Self::Future {
// Check rate limit
{
let mut last_reset = self.state.last_reset.lock().unwrap();
let now = Instant::now();
if now.duration_since(*last_reset) >= Duration::from_secs(1) {
*last_reset = now;
self.state.count.store(0, Ordering::Relaxed);
}
}
let count = self.state.count.fetch_add(1, Ordering::Relaxed);
if count >= self.limit {
return Box::pin(async move {
Err(format!("Rate limit exceeded ({} req/s)", self.limit))
});
}
let future = self.inner.call(req);
Box::pin(async move {
future.await.map_err(|_| "Service error".to_string())
})
}
}
#[tokio::main]
async fn main() {
let inner = tower::service_fn(|i: u64| async move {
Ok::<_, std::convert::Infallible>(format!("Request {} processed", i))
});
let mut service = RateLimitLayer::new(3).layer(inner);
for i in 0..5 {
match service.ready().await.unwrap().call(i).await {
Ok(response) => println!("Success: {}", response),
Err(e) => println!("Error: {}", e),
}
}
}use tower::{Layer, Service, ServiceExt};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
#[derive(Clone)]
struct RetryLayer {
max_retries: u32,
}
impl RetryLayer {
fn new(max_retries: u32) -> Self {
Self { max_retries }
}
}
impl<S> Layer<S> for RetryLayer {
type Service = RetryMiddleware<S>;
fn layer(&self, inner: S) -> Self::Service {
RetryMiddleware {
inner,
max_retries: self.max_retries,
attempt: Arc::new(AtomicU32::new(0)),
}
}
}
#[derive(Clone)]
struct RetryMiddleware<S> {
inner: S,
max_retries: u32,
attempt: Arc<AtomicU32>,
}
impl<S, Request> Service<Request> for RetryMiddleware<S>
where
S: Service<Request, Error = String> + Clone,
S::Future: Send + 'static,
Request: Clone + Send + 'static,
{
type Response = S::Response;
type Error = String;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Request) -> Self::Future {
let attempt = self.attempt.fetch_add(1, Ordering::Relaxed);
let max_retries = self.max_retries;
let mut inner = self.inner.clone();
Box::pin(async move {
let mut current_attempt = attempt;
loop {
match inner.call(req.clone()).await {
Ok(response) => {
println!("Success on attempt {}", current_attempt + 1);
return Ok(response);
}
Err(e) if current_attempt < max_retries => {
println!("Attempt {} failed: {}. Retrying...", current_attempt + 1, e);
current_attempt += 1;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
Err(e) => {
return Err(format!("All {} attempts failed. Last error: {}", max_retries + 1, e));
}
}
}
})
}
}
#[tokio::main]
async fn main() {
let attempt = Arc::new(AtomicU32::new(0));
let attempt_clone = attempt.clone();
let inner = tower::service_fn(move |_: ()| {
let attempt = attempt_clone.clone();
async move {
let current = attempt.fetch_add(1, Ordering::Relaxed);
if current < 2 {
Err("Transient error".to_string())
} else {
Ok::<_, String>("Success!")
}
}
});
let mut service = RetryLayer::new(3).layer(inner);
match service.ready().await.unwrap().call(()).await {
Ok(response) => println!("Result: {}", response),
Err(e) => println!("Error: {}", e),
}
}use tower::{ServiceBuilder, Service, ServiceExt};
use tower::limit::concurrency::ConcurrencyLimitLayer;
use std::time::Duration;
#[tokio::main]
async fn main() {
let inner = tower::service_fn(|req: &'static str| async move {
tokio::time::sleep(Duration::from_millis(100)).await;
Ok::<_, std::convert::Infallible>(format!("Processed: {}", req))
});
// Using tower's built-in layers
let mut service = ServiceBuilder::new()
// Limit to 2 concurrent requests
.layer(ConcurrencyLimitLayer::new(2))
// Add timeout
.layer(tower::timeout::TimeoutLayer::new(Duration::from_secs(5)))
.service(inner);
let response = service
.ready()
.await
.unwrap()
.call("hello")
.await
.unwrap();
println!("Response: {}", response);
}use tower::{Service, ServiceExt, discover::ServiceList};
#[tokio::main]
async fn main() {
let service_a = tower::service_fn(|_: ()| async {
Ok::<_, std::convert::Infallible>("Service A")
});
let service_b = tower::service_fn(|_: ()| async {
Ok::<_, std::convert::Infallible>("Service B")
});
// Create a service list for discovery
let discover = ServiceList::new(vec![service_a, service_b]);
println!("Service discovery created with {} services", 2);
}// This example shows how tower middleware integrates with axum
// Run it with: cargo run --features axum
/*
use axum::{
routing::get,
Router,
};
use tower::ServiceBuilder;
use std::time::Duration;
async fn handler() -> &'static str {
"Hello!"
}
#[tokio::main]
async fn main() {
let app = Router::new()
.route("/", get(handler))
.layer(
ServiceBuilder::new()
.layer(tower::limit::ConcurrencyLimitLayer::new(100))
.layer(tower::timeout::TimeoutLayer::new(Duration::from_secs(30)))
);
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
}
*/
fn main() {
println!("Uncomment the code and add axum to Cargo.toml to run with axum");
}use tower::{ServiceBuilder, Service, ServiceExt};
use tower::buffer::BufferLayer;
#[tokio::main]
async fn main() {
let inner = tower::service_fn(|req: u32| async move {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
Ok::<_, std::convert::Infallible>(req * 2)
});
// Buffer allows the service to handle requests asynchronously
// and provides backpressure when buffer is full
let mut service = ServiceBuilder::new()
.layer(BufferLayer::new(10))
.service(inner);
let response = service.ready().await.unwrap().call(5).await.unwrap();
println!("Response: {}", response);
}use tower::{Service, ServiceExt};
// Simple round-robin load balancing example
struct LoadBalancer<S> {
services: Vec<S>,
current: std::sync::atomic::AtomicUsize,
}
impl<S> LoadBalancer<S> {
fn new(services: Vec<S>) -> Self {
Self {
services,
current: std::sync::atomic::AtomicUsize::new(0),
}
}
}
impl<S, Request> Service<Request> for LoadBalancer<S>
where
S: Service<Request>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
// Poll all services
for service in &mut self.services {
service.poll_ready(cx)?;
}
std::task::Poll::Ready(Ok(()))
}
fn call(&mut self, req: Request) -> Self::Future {
let idx = self.current.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % self.services.len();
self.services[idx].call(req)
}
}
#[tokio::main]
async fn main() {
let service1 = tower::service_fn(|_: ()| async {
Ok::<_, std::convert::Infallible>("Service 1")
});
let service2 = tower::service_fn(|_: ()| async {
Ok::<_, std::convert::Infallible>("Service 2")
});
let mut lb = LoadBalancer::new(vec![service1, service2]);
for _ in 0..4 {
let response = lb.ready().await.unwrap().call(()).await.unwrap();
println!("Response: {}", response);
}
}Service trait: async function taking Request, returning ResponseLayer trait: transforms one service into anotherservice_fn wraps closures as servicesServiceBuilder chains multiple layers togetherpoll_ready checks if service can accept requestscall processes the request asynchronouslyBox::pin for recursive async in middleware