Loading pageβ¦
Rust walkthroughs
Loading pageβ¦
hyper::server::conn::Http::serve_connection enable custom HTTP protocol implementations?serve_connection decouples HTTP protocol handling from transport by accepting any AsyncRead + AsyncWrite stream and a Service implementation, handling HTTP/1.x framing, connection lifecycle, and upgrade semantics while leaving transport details to the caller. This design enables custom protocol implementations by allowing you to provide any stream typeβTCP sockets, TLS streams, Unix domain sockets, WebSockets, or custom transportsβand any service implementation that processes HTTP requests. The method handles all HTTP protocol concerns: parsing requests, framing responses, managing keep-alive, handling Expect: 100-continue, and protocol upgrades, while the caller retains control over connection acceptance, TLS termination, and stream lifecycle.
use hyper::server::conn::Http;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use tokio::net::TcpListener;
use std::convert::Infallible;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:3000").await?;
// Create the HTTP protocol handler
let http = Http::new();
loop {
// Accept connections manually
let (stream, addr) = listener.accept().await?;
// Create a service for this connection
let service = service_fn(|req: Request<Body>| async {
Ok::<_, Infallible>(Response::new(Body::from("Hello World")))
});
// serve_connection handles HTTP protocol over the stream
tokio::spawn(async move {
// This handles:
// - HTTP request parsing
// - Response framing
// - Keep-alive
// - Connection errors
if let Err(e) = http.serve_connection(stream, service).await {
eprintln!("Connection error: {}", e);
}
});
}
}serve_connection takes a stream and service, handling HTTP protocol while you control the transport.
use hyper::server::conn::Http;
use hyper::{Body, Request, Response};
use std::convert::Infallible;
fn demonstrate_http_builder() {
// Http is configurable for HTTP/1.x protocol behavior
let http = Http::new()
.http1_half_close(true) // Allow HTTP/1 half-close
.http1_keep_alive(true) // Enable keep-alive
.http1_title_case_headers(false) // Use lowercase headers
.http1_preserve_header_case(true) // Preserve original header case
.max_buf_size(8192); // Max buffer size for reading
// These settings control HTTP/1.x protocol behavior
// serve_connection uses these settings for all connections
// Note: For HTTP/2, use Http2::new() instead
}
#[tokio::main]
async fn example_with_configured_http() {
let http = Http::new()
.http1_half_close(true)
.http1_keep_alive(true);
// The configured Http instance can serve multiple connections
// Each connection uses the same protocol configuration
}Http::new() creates a configurable HTTP/1.x protocol handler with options for connection behavior.
use hyper::server::conn::Http;
use hyper::service::service_fn;
use hyper::{Body, Request, Response};
use tokio::net::{TcpListener, UnixListener};
use tokio_util::compat::{TokioAsyncReadCompatExt, Compat};
use std::convert::Infallible;
use std::os::unix::io::AsRawFd;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let http = Http::new();
// TCP sockets work directly
let tcp_listener = TcpListener::bind("127.0.0.1:3000").await?;
// Unix domain sockets also work
// (requires tokio::net::UnixListener)
// let unix_listener = UnixListener::bind("/tmp/mysocket")?;
// Any AsyncRead + AsyncWrite stream works
// - TcpStream (from tokio)
// - UnixStream (from tokio)
// - TlsStream (from tokio-rustls)
// - Custom streams
loop {
let (stream, _addr) = tcp_listener.accept().await?;
let service = service_fn(|req: Request<Body>| async {
Ok::<_, Infallible>(Response::new(Body::from("Hello")))
});
tokio::spawn(async move {
// serve_connection is transport-agnostic
// It only requires AsyncRead + AsyncWrite
let _ = http.serve_connection(stream, service).await;
});
}
}serve_connection works with any AsyncRead + AsyncWrite stream, enabling TCP, Unix sockets, or custom transports.
use hyper::server::conn::Http;
use hyper::service::service_fn;
use hyper::{Body, Request, Response};
use std::convert::Infallible;
use tokio::net::TcpListener;
// Example with tokio-rustls (conceptual - requires rustls dependency)
// use tokio_rustls::{TlsAcceptor, server::TlsStream};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let http = Http::new();
let listener = TcpListener::bind("127.0.0.1:3000").await?;
// TLS acceptor configuration would go here
// let acceptor = TlsAcceptor::from(config);
loop {
let (tcp_stream, _addr) = listener.accept().await?;
// Perform TLS handshake before HTTP
// let tls_stream = acceptor.accept(tcp_stream).await?;
tokio::spawn(async move {
let service = service_fn(|req: Request<Body>| async {
Ok::<_, Infallible>(Response::new(Body::from("HTTPS")))
});
// serve_connection handles HTTP over the TLS stream
// let _ = http.serve_connection(tls_stream, service).await;
// For this example, just use TCP directly
let _ = http.serve_connection(tcp_stream, service).await;
});
}
}TLS termination happens before serve_connection, allowing you to control certificate handling and TLS configuration.
use hyper::server::conn::Http;
use hyper::service::{service_fn, Service};
use hyper::{Body, Request, Response, Method, StatusCode};
use std::convert::Infallible;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
// Simple service function (most common pattern)
async fn handle_request(req: Request<Body>) -> Result<Response<Body>, Infallible> {
match (req.method(), req.uri().path()) {
(&Method::GET, "/") => Ok(Response::new(Body::from("Home"))),
(&Method::GET, "/health") => Ok(Response::new(Body::from("OK"))),
(&Method::POST, "/echo") => Ok(Response::new(req.into_body())),
_ => Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from("Not Found"))
.unwrap()),
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let http = Http::new();
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await?;
loop {
let (stream, _) = listener.accept().await?;
// service_fn wraps any async function
let service = service_fn(handle_request);
tokio::spawn(async move {
let _ = http.serve_connection(stream, service).await;
});
}
}service_fn wraps an async function; custom Service implementations enable more complex request handling.
use hyper::server::conn::Http;
use hyper::service::Service;
use hyper::{Body, Request, Response, StatusCode};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
// Custom service with shared state
struct MyService {
state: Arc<AppState>,
}
struct AppState {
counter: std::sync::atomic::AtomicU64,
}
impl Service<Request<Body>> for MyService {
type Response = Response<Body>;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Request<Body>) -> Self::Future {
let state = Arc::clone(&self.state);
Box::pin(async move {
// Access shared state in the handler
let count = state.counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let response = Response::builder()
.status(StatusCode::OK)
.body(Body::from(format!("Request #{}", count)))
.unwrap();
Ok(response)
})
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let http = Http::new();
let state = Arc::new(AppState {
counter: std::sync::atomic::AtomicU64::new(0),
});
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await?;
loop {
let (stream, _) = listener.accept().await?;
let state = Arc::clone(&state);
tokio::spawn(async move {
// Create a new service instance per connection
let service = MyService { state };
let _ = http.serve_connection(stream, service).await;
});
}
}Implement Service directly for services that need shared state or custom behavior.
use hyper::server::conn::Http;
use hyper::service::service_fn;
use hyper::{Body, Request, Response, StatusCode, upgrade::Upgraded};
use std::convert::Infallible;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let http = Http::new();
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await?;
loop {
let (stream, _) = listener.accept().await?;
tokio::spawn(async move {
let service = service_fn(|req: Request<Body>| async {
// Check for WebSocket upgrade
if req.headers().contains_key("Upgrade") {
// initiate_upgrade(req).await
// serve_connection handles upgrade handshake
// Return 101 Switching Protocols
let mut response = Response::new(Body::empty());
*response.status_mut() = StatusCode::SWITCHING_PROTOCOLS;
response.headers_mut().insert(
"Upgrade",
"websocket".parse().unwrap()
);
return Ok::<_, Infallible>(response);
}
Ok::<_, Infallible>(Response::new(Body::from("HTTP")))
});
// serve_connection returns Ok on upgrade
// You can then use the upgraded connection
let result = http.serve_connection(stream, service).await;
match result {
Ok(_) => println!("Connection closed normally"),
Err(e) => eprintln!("Connection error: {}", e),
}
});
}
}
// After upgrade, the stream is no longer HTTP
// You can use hyper::upgrade::on(msg) to get the upgraded stream
// Then handle the WebSocket (or other protocol) directlyserve_connection handles HTTP upgrade headers; after the upgrade, you can use the stream for other protocols.
use hyper::server::conn::Http;
use hyper::service::service_fn;
use hyper::{Body, Request, Response};
use std::convert::Infallible;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure HTTP/1 to handle 100-continue
let http = Http::new()
.http1_preserve_header_case(true)
.http1_half_close(true);
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await?;
loop {
let (stream, _) = listener.accept().await?;
tokio::spawn(async move {
let service = service_fn(|req: Request<Body>| async {
// serve_connection handles Expect: 100-continue
// It sends "100 Continue" before calling the service
// if the request has an Expect header
// Your service receives the request after 100 Continue
// is sent, so you can process the body
// If you reject, the body won't be read (for large uploads)
Ok::<_, Infallible>(Response::new(Body::from("OK")))
});
// serve_connection manages the 100-continue flow
let _ = http.serve_connection(stream, service).await;
});
}
}serve_connection automatically handles Expect: 100-continue before invoking your service.
use hyper::server::conn::Http;
use hyper::service::service_fn;
use hyper::{Body, Request, Response};
use std::convert::Infallible;
use tokio::signal;
use tokio::sync::broadcast;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let http = Http::new();
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await?;
// Create shutdown signal
let (shutdown_tx, mut shutdown_rx) = broadcast::channel::<()>(1);
// Spawn connection handlers
let mut connection_handles = Vec::new();
loop {
tokio::select! {
// Accept new connections
accept_result = listener.accept() => {
let (stream, _) = accept_result?;
let mut shutdown_rx = shutdown_tx.subscribe();
let handle = tokio::spawn(async move {
let service = service_fn(|_req: Request<Body>| async {
Ok::<_, Infallible>(Response::new(Body::from("OK")))
});
// serve_connection runs until connection closes or error
// For graceful shutdown, use with_graceful_shutdown
// (in hyper::server::Server, but here we'd need to
// manage it manually)
let _ = http.serve_connection(stream, service).await;
});
connection_handles.push(handle);
}
// Handle shutdown signal
_ = signal::ctrl_c() => {
println!("Shutting down...");
shutdown_tx.send(())?;
break;
}
}
}
// Wait for all connections to finish
for handle in connection_handles {
let _ = handle.await;
}
println!("All connections closed");
Ok(())
}For graceful shutdown, coordinate connection lifecycle; each serve_connection task can be awaited independently.
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use std::convert::Infallible;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// APPROACH 1: High-level Server::bind
// Handles connection acceptance automatically
// Less control over transport
let addr = "127.0.0.1:3000".parse()?;
let make_service = make_service_fn(|_conn| async {
Ok::<_, Infallible>(service_fn(|_req: Request<Body>| async {
Ok::<_, Infallible>(Response::new(Body::from("Hello")))
}))
});
let server = Server::bind(&addr).serve(make_service);
// server.await?; // Runs until error or graceful shutdown
// APPROACH 2: Low-level serve_connection
// You handle connection acceptance
// Full control over transport and TLS
let http = hyper::server::conn::Http::new();
let listener = tokio::net::TcpListener::bind("127.0.0.1:3001").await?;
loop {
let (stream, _) = listener.accept().await?;
tokio::spawn(async move {
let service = service_fn(|_req: Request<Body>| async {
Ok::<_, Infallible>(Response::new(Body::from("Hello")))
});
let _ = http.serve_connection(stream, service).await;
});
}
// Key differences:
// Server::bind:
// - Handles TCP acceptance
// - No custom transport
// - Simpler API
// - Built-in graceful shutdown
// serve_connection:
// - You handle acceptance
// - Any transport (TCP, TLS, Unix, custom)
// - More control
// - Manual graceful shutdown
}Server::bind is simpler but less flexible; serve_connection gives full control over transport.
use hyper::server::conn::Http;
use hyper::service::service_fn;
use hyper::{Body, Request, Response};
use std::convert::Infallible;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use std::pin::Pin;
use std::task::{Context, Poll};
// Custom stream that wraps another stream with logging
struct LoggingStream<S> {
inner: S,
id: u64,
}
impl<S: AsyncRead + Unpin> AsyncRead for LoggingStream<S> {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<std::io::Result<()>> {
// Log before reading (in real code, you might track metrics)
// println!("Stream {} reading", self.id);
Pin::new(&mut self.inner).poll_read(cx, buf)
}
}
impl<S: AsyncWrite + Unpin> AsyncWrite for LoggingStream<S> {
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<std::io::Result<usize>> {
// println!("Stream {} writing {} bytes", self.id, buf.len());
Pin::new(&mut self.inner).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.inner).poll_flush(cx)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.inner).poll_shutdown(cx)
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let http = Http::new();
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await?;
let mut stream_id = 0u64;
loop {
let (stream, _) = listener.accept().await?;
stream_id += 1;
// Wrap with custom stream
let logging_stream = LoggingStream {
inner: stream,
id: stream_id,
};
tokio::spawn(async move {
let service = service_fn(|_req: Request<Body>| async {
Ok::<_, Infallible>(Response::new(Body::from("OK")))
});
// serve_connection works with our custom stream
let _ = http.serve_connection(logging_stream, service).await;
});
}
}Implement AsyncRead + AsyncWrite for custom streams; serve_connection works with any such implementation.
Architecture layers:
βββββββββββββββββββββββββββββββββββββββ
β Your Service Code β <- Request processing
β (Request -> Response) β
βββββββββββββββββββββββββββββββββββββββ€
β hyper::Service β <- Service trait
βββββββββββββββββββββββββββββββββββββββ€
β Http::serve_connection β <- HTTP protocol handling
β (parsing, framing, keep-alive) β
βββββββββββββββββββββββββββββββββββββββ€
β Stream (AsyncRead + AsyncWrite) β <- Transport
β (TcpStream, TlsStream, Unix, etc.) β
βββββββββββββββββββββββββββββββββββββββ
What serve_connection handles:
| Concern | Handled By |
|---------|------------|
| TCP accept | Caller |
| TLS handshake | Caller |
| HTTP/1.x parsing | serve_connection |
| Request framing | serve_connection |
| Response serialization | serve_connection |
| Keep-alive | serve_connection |
| 100-continue | serve_connection |
| Protocol upgrade | serve_connection |
| Connection errors | serve_connection |
| Request handling | Service |
When to use serve_connection:
When to use Server::bind:
Key insight: serve_connection is the boundary between HTTP protocol handling and transport abstraction. Everything above it is HTTP: parsing requests, routing, generating responses. Everything below it is raw bytes: TCP packets, TLS frames, Unix socket I/O. This separation means you can use hyper's HTTP implementation with any transport that provides AsyncRead + AsyncWrite. You could implement a custom stream that logs all I/O, wraps connections through a proxy, or implements a completely different transport like QUIC. The HTTP protocol detailsβparsing headers, handling content-length, managing chunked encoding, keep-alive timersβare all handled by serve_connection. You provide a Service implementation that receives Request objects and returns Response objects, completely abstracted from the wire format. This architecture enables hyper to power not just traditional web servers but also proxies, load balancers, API gateways, and any system that needs HTTP over non-standard transports. The method represents a "lower half" of the traditional server stack: connection acceptance and transport are external, HTTP parsing is internal, and request handling is delegated to your service.