How does hyper::server::conn::Http::serve_connection enable custom HTTP protocol implementations?

serve_connection decouples HTTP protocol handling from transport by accepting any AsyncRead + AsyncWrite stream and a Service implementation, handling HTTP/1.x framing, connection lifecycle, and upgrade semantics while leaving transport details to the caller. This design enables custom protocol implementations by allowing you to provide any stream typeβ€”TCP sockets, TLS streams, Unix domain sockets, WebSockets, or custom transportsβ€”and any service implementation that processes HTTP requests. The method handles all HTTP protocol concerns: parsing requests, framing responses, managing keep-alive, handling Expect: 100-continue, and protocol upgrades, while the caller retains control over connection acceptance, TLS termination, and stream lifecycle.

Basic serve_connection Usage

use hyper::server::conn::Http;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use tokio::net::TcpListener;
use std::convert::Infallible;
 
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:3000").await?;
    
    // Create the HTTP protocol handler
    let http = Http::new();
    
    loop {
        // Accept connections manually
        let (stream, addr) = listener.accept().await?;
        
        // Create a service for this connection
        let service = service_fn(|req: Request<Body>| async {
            Ok::<_, Infallible>(Response::new(Body::from("Hello World")))
        });
        
        // serve_connection handles HTTP protocol over the stream
        tokio::spawn(async move {
            // This handles:
            // - HTTP request parsing
            // - Response framing
            // - Keep-alive
            // - Connection errors
            if let Err(e) = http.serve_connection(stream, service).await {
                eprintln!("Connection error: {}", e);
            }
        });
    }
}

serve_connection takes a stream and service, handling HTTP protocol while you control the transport.

The Http Protocol Type

use hyper::server::conn::Http;
use hyper::{Body, Request, Response};
use std::convert::Infallible;
 
fn demonstrate_http_builder() {
    // Http is configurable for HTTP/1.x protocol behavior
    let http = Http::new()
        .http1_half_close(true)      // Allow HTTP/1 half-close
        .http1_keep_alive(true)       // Enable keep-alive
        .http1_title_case_headers(false)  // Use lowercase headers
        .http1_preserve_header_case(true)  // Preserve original header case
        .max_buf_size(8192);          // Max buffer size for reading
    
    // These settings control HTTP/1.x protocol behavior
    // serve_connection uses these settings for all connections
    
    // Note: For HTTP/2, use Http2::new() instead
}
 
#[tokio::main]
async fn example_with_configured_http() {
    let http = Http::new()
        .http1_half_close(true)
        .http1_keep_alive(true);
    
    // The configured Http instance can serve multiple connections
    // Each connection uses the same protocol configuration
}

Http::new() creates a configurable HTTP/1.x protocol handler with options for connection behavior.

Custom Transport Streams

use hyper::server::conn::Http;
use hyper::service::service_fn;
use hyper::{Body, Request, Response};
use tokio::net::{TcpListener, UnixListener};
use tokio_util::compat::{TokioAsyncReadCompatExt, Compat};
use std::convert::Infallible;
use std::os::unix::io::AsRawFd;
 
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let http = Http::new();
    
    // TCP sockets work directly
    let tcp_listener = TcpListener::bind("127.0.0.1:3000").await?;
    
    // Unix domain sockets also work
    // (requires tokio::net::UnixListener)
    // let unix_listener = UnixListener::bind("/tmp/mysocket")?;
    
    // Any AsyncRead + AsyncWrite stream works
    // - TcpStream (from tokio)
    // - UnixStream (from tokio)
    // - TlsStream (from tokio-rustls)
    // - Custom streams
    
    loop {
        let (stream, _addr) = tcp_listener.accept().await?;
        
        let service = service_fn(|req: Request<Body>| async {
            Ok::<_, Infallible>(Response::new(Body::from("Hello")))
        });
        
        tokio::spawn(async move {
            // serve_connection is transport-agnostic
            // It only requires AsyncRead + AsyncWrite
            let _ = http.serve_connection(stream, service).await;
        });
    }
}

serve_connection works with any AsyncRead + AsyncWrite stream, enabling TCP, Unix sockets, or custom transports.

TLS Termination Before HTTP

use hyper::server::conn::Http;
use hyper::service::service_fn;
use hyper::{Body, Request, Response};
use std::convert::Infallible;
use tokio::net::TcpListener;
 
// Example with tokio-rustls (conceptual - requires rustls dependency)
// use tokio_rustls::{TlsAcceptor, server::TlsStream};
 
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let http = Http::new();
    let listener = TcpListener::bind("127.0.0.1:3000").await?;
    
    // TLS acceptor configuration would go here
    // let acceptor = TlsAcceptor::from(config);
    
    loop {
        let (tcp_stream, _addr) = listener.accept().await?;
        
        // Perform TLS handshake before HTTP
        // let tls_stream = acceptor.accept(tcp_stream).await?;
        
        tokio::spawn(async move {
            let service = service_fn(|req: Request<Body>| async {
                Ok::<_, Infallible>(Response::new(Body::from("HTTPS")))
            });
            
            // serve_connection handles HTTP over the TLS stream
            // let _ = http.serve_connection(tls_stream, service).await;
            
            // For this example, just use TCP directly
            let _ = http.serve_connection(tcp_stream, service).await;
        });
    }
}

TLS termination happens before serve_connection, allowing you to control certificate handling and TLS configuration.

Service Implementation for Custom Handlers

use hyper::server::conn::Http;
use hyper::service::{service_fn, Service};
use hyper::{Body, Request, Response, Method, StatusCode};
use std::convert::Infallible;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
 
// Simple service function (most common pattern)
async fn handle_request(req: Request<Body>) -> Result<Response<Body>, Infallible> {
    match (req.method(), req.uri().path()) {
        (&Method::GET, "/") => Ok(Response::new(Body::from("Home"))),
        (&Method::GET, "/health") => Ok(Response::new(Body::from("OK"))),
        (&Method::POST, "/echo") => Ok(Response::new(req.into_body())),
        _ => Ok(Response::builder()
            .status(StatusCode::NOT_FOUND)
            .body(Body::from("Not Found"))
            .unwrap()),
    }
}
 
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let http = Http::new();
    let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await?;
    
    loop {
        let (stream, _) = listener.accept().await?;
        
        // service_fn wraps any async function
        let service = service_fn(handle_request);
        
        tokio::spawn(async move {
            let _ = http.serve_connection(stream, service).await;
        });
    }
}

service_fn wraps an async function; custom Service implementations enable more complex request handling.

Custom Service Implementation

use hyper::server::conn::Http;
use hyper::service::Service;
use hyper::{Body, Request, Response, StatusCode};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
 
// Custom service with shared state
struct MyService {
    state: Arc<AppState>,
}
 
struct AppState {
    counter: std::sync::atomic::AtomicU64,
}
 
impl Service<Request<Body>> for MyService {
    type Response = Response<Body>;
    type Error = hyper::Error;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
    
    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }
    
    fn call(&mut self, req: Request<Body>) -> Self::Future {
        let state = Arc::clone(&self.state);
        
        Box::pin(async move {
            // Access shared state in the handler
            let count = state.counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
            
            let response = Response::builder()
                .status(StatusCode::OK)
                .body(Body::from(format!("Request #{}", count)))
                .unwrap();
            
            Ok(response)
        })
    }
}
 
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let http = Http::new();
    let state = Arc::new(AppState {
        counter: std::sync::atomic::AtomicU64::new(0),
    });
    
    let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await?;
    
    loop {
        let (stream, _) = listener.accept().await?;
        let state = Arc::clone(&state);
        
        tokio::spawn(async move {
            // Create a new service instance per connection
            let service = MyService { state };
            let _ = http.serve_connection(stream, service).await;
        });
    }
}

Implement Service directly for services that need shared state or custom behavior.

Connection Upgrades and WebSockets

use hyper::server::conn::Http;
use hyper::service::service_fn;
use hyper::{Body, Request, Response, StatusCode, upgrade::Upgraded};
use std::convert::Infallible;
 
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let http = Http::new();
    let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await?;
    
    loop {
        let (stream, _) = listener.accept().await?;
        
        tokio::spawn(async move {
            let service = service_fn(|req: Request<Body>| async {
                // Check for WebSocket upgrade
                if req.headers().contains_key("Upgrade") {
                    // initiate_upgrade(req).await
                    // serve_connection handles upgrade handshake
                    
                    // Return 101 Switching Protocols
                    let mut response = Response::new(Body::empty());
                    *response.status_mut() = StatusCode::SWITCHING_PROTOCOLS;
                    response.headers_mut().insert(
                        "Upgrade",
                        "websocket".parse().unwrap()
                    );
                    return Ok::<_, Infallible>(response);
                }
                
                Ok::<_, Infallible>(Response::new(Body::from("HTTP")))
            });
            
            // serve_connection returns Ok on upgrade
            // You can then use the upgraded connection
            let result = http.serve_connection(stream, service).await;
            
            match result {
                Ok(_) => println!("Connection closed normally"),
                Err(e) => eprintln!("Connection error: {}", e),
            }
        });
    }
}
 
// After upgrade, the stream is no longer HTTP
// You can use hyper::upgrade::on(msg) to get the upgraded stream
// Then handle the WebSocket (or other protocol) directly

serve_connection handles HTTP upgrade headers; after the upgrade, you can use the stream for other protocols.

Handling Expect: 100-continue

use hyper::server::conn::Http;
use hyper::service::service_fn;
use hyper::{Body, Request, Response};
use std::convert::Infallible;
 
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Configure HTTP/1 to handle 100-continue
    let http = Http::new()
        .http1_preserve_header_case(true)
        .http1_half_close(true);
    
    let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await?;
    
    loop {
        let (stream, _) = listener.accept().await?;
        
        tokio::spawn(async move {
            let service = service_fn(|req: Request<Body>| async {
                // serve_connection handles Expect: 100-continue
                // It sends "100 Continue" before calling the service
                // if the request has an Expect header
                
                // Your service receives the request after 100 Continue
                // is sent, so you can process the body
                
                // If you reject, the body won't be read (for large uploads)
                
                Ok::<_, Infallible>(Response::new(Body::from("OK")))
            });
            
            // serve_connection manages the 100-continue flow
            let _ = http.serve_connection(stream, service).await;
        });
    }
}

serve_connection automatically handles Expect: 100-continue before invoking your service.

Graceful Shutdown with serve_connection

use hyper::server::conn::Http;
use hyper::service::service_fn;
use hyper::{Body, Request, Response};
use std::convert::Infallible;
use tokio::signal;
use tokio::sync::broadcast;
 
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let http = Http::new();
    let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await?;
    
    // Create shutdown signal
    let (shutdown_tx, mut shutdown_rx) = broadcast::channel::<()>(1);
    
    // Spawn connection handlers
    let mut connection_handles = Vec::new();
    
    loop {
        tokio::select! {
            // Accept new connections
            accept_result = listener.accept() => {
                let (stream, _) = accept_result?;
                let mut shutdown_rx = shutdown_tx.subscribe();
                
                let handle = tokio::spawn(async move {
                    let service = service_fn(|_req: Request<Body>| async {
                        Ok::<_, Infallible>(Response::new(Body::from("OK")))
                    });
                    
                    // serve_connection runs until connection closes or error
                    // For graceful shutdown, use with_graceful_shutdown
                    // (in hyper::server::Server, but here we'd need to
                    // manage it manually)
                    
                    let _ = http.serve_connection(stream, service).await;
                });
                
                connection_handles.push(handle);
            }
            
            // Handle shutdown signal
            _ = signal::ctrl_c() => {
                println!("Shutting down...");
                shutdown_tx.send(())?;
                break;
            }
        }
    }
    
    // Wait for all connections to finish
    for handle in connection_handles {
        let _ = handle.await;
    }
    
    println!("All connections closed");
    Ok(())
}

For graceful shutdown, coordinate connection lifecycle; each serve_connection task can be awaited independently.

Comparison with Server::bind

use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use std::convert::Infallible;
 
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // APPROACH 1: High-level Server::bind
    // Handles connection acceptance automatically
    // Less control over transport
    
    let addr = "127.0.0.1:3000".parse()?;
    
    let make_service = make_service_fn(|_conn| async {
        Ok::<_, Infallible>(service_fn(|_req: Request<Body>| async {
            Ok::<_, Infallible>(Response::new(Body::from("Hello")))
        }))
    });
    
    let server = Server::bind(&addr).serve(make_service);
    
    // server.await?;  // Runs until error or graceful shutdown
    
    // APPROACH 2: Low-level serve_connection
    // You handle connection acceptance
    // Full control over transport and TLS
    
    let http = hyper::server::conn::Http::new();
    let listener = tokio::net::TcpListener::bind("127.0.0.1:3001").await?;
    
    loop {
        let (stream, _) = listener.accept().await?;
        
        tokio::spawn(async move {
            let service = service_fn(|_req: Request<Body>| async {
                Ok::<_, Infallible>(Response::new(Body::from("Hello")))
            });
            
            let _ = http.serve_connection(stream, service).await;
        });
    }
    
    // Key differences:
    // Server::bind:
    // - Handles TCP acceptance
    // - No custom transport
    // - Simpler API
    // - Built-in graceful shutdown
    
    // serve_connection:
    // - You handle acceptance
    // - Any transport (TCP, TLS, Unix, custom)
    // - More control
    // - Manual graceful shutdown
}

Server::bind is simpler but less flexible; serve_connection gives full control over transport.

Custom Stream Implementation

use hyper::server::conn::Http;
use hyper::service::service_fn;
use hyper::{Body, Request, Response};
use std::convert::Infallible;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use std::pin::Pin;
use std::task::{Context, Poll};
 
// Custom stream that wraps another stream with logging
struct LoggingStream<S> {
    inner: S,
    id: u64,
}
 
impl<S: AsyncRead + Unpin> AsyncRead for LoggingStream<S> {
    fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<std::io::Result<()>> {
        // Log before reading (in real code, you might track metrics)
        // println!("Stream {} reading", self.id);
        
        Pin::new(&mut self.inner).poll_read(cx, buf)
    }
}
 
impl<S: AsyncWrite + Unpin> AsyncWrite for LoggingStream<S> {
    fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<std::io::Result<usize>> {
        // println!("Stream {} writing {} bytes", self.id, buf.len());
        Pin::new(&mut self.inner).poll_write(cx, buf)
    }
    
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
        Pin::new(&mut self.inner).poll_flush(cx)
    }
    
    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
        Pin::new(&mut self.inner).poll_shutdown(cx)
    }
}
 
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let http = Http::new();
    let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await?;
    let mut stream_id = 0u64;
    
    loop {
        let (stream, _) = listener.accept().await?;
        stream_id += 1;
        
        // Wrap with custom stream
        let logging_stream = LoggingStream {
            inner: stream,
            id: stream_id,
        };
        
        tokio::spawn(async move {
            let service = service_fn(|_req: Request<Body>| async {
                Ok::<_, Infallible>(Response::new(Body::from("OK")))
            });
            
            // serve_connection works with our custom stream
            let _ = http.serve_connection(logging_stream, service).await;
        });
    }
}

Implement AsyncRead + AsyncWrite for custom streams; serve_connection works with any such implementation.

Synthesis

Architecture layers:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚         Your Service Code           β”‚  <- Request processing
β”‚     (Request -> Response)           β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚         hyper::Service              β”‚  <- Service trait
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚     Http::serve_connection          β”‚  <- HTTP protocol handling
β”‚  (parsing, framing, keep-alive)     β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚     Stream (AsyncRead + AsyncWrite) β”‚  <- Transport
β”‚  (TcpStream, TlsStream, Unix, etc.) β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

What serve_connection handles:

Concern Handled By
TCP accept Caller
TLS handshake Caller
HTTP/1.x parsing serve_connection
Request framing serve_connection
Response serialization serve_connection
Keep-alive serve_connection
100-continue serve_connection
Protocol upgrade serve_connection
Connection errors serve_connection
Request handling Service

When to use serve_connection:

  • Custom transports (Unix sockets, TLS, custom protocols)
  • Per-connection configuration
  • Connection-level metrics or logging
  • Protocol upgrades (WebSockets)
  • Integration with existing accept loops

When to use Server::bind:

  • Standard TCP servers
  • Simpler use cases
  • Built-in graceful shutdown

Key insight: serve_connection is the boundary between HTTP protocol handling and transport abstraction. Everything above it is HTTP: parsing requests, routing, generating responses. Everything below it is raw bytes: TCP packets, TLS frames, Unix socket I/O. This separation means you can use hyper's HTTP implementation with any transport that provides AsyncRead + AsyncWrite. You could implement a custom stream that logs all I/O, wraps connections through a proxy, or implements a completely different transport like QUIC. The HTTP protocol detailsβ€”parsing headers, handling content-length, managing chunked encoding, keep-alive timersβ€”are all handled by serve_connection. You provide a Service implementation that receives Request objects and returns Response objects, completely abstracted from the wire format. This architecture enables hyper to power not just traditional web servers but also proxies, load balancers, API gateways, and any system that needs HTTP over non-standard transports. The method represents a "lower half" of the traditional server stack: connection acceptance and transport are external, HTTP parsing is internal, and request handling is delegated to your service.