What is the difference between hyper::body::Body::on_upgrade for handling HTTP/1.1 connection upgrades?

Body::on_upgrade returns a future that resolves when an HTTP/1.1 connection is upgraded (like WebSocket), giving you ownership of the underlying TCP stream after the HTTP handshake completes—you cannot use the body for normal response data after calling on_upgrade because the connection is no longer speaking HTTP. This is the mechanism for hijacking HTTP connections for protocol switches like WebSocket.

HTTP Connection Upgrades

// HTTP/1.1 supports protocol upgrades via the Upgrade header
// 
// Client sends:
// GET /chat HTTP/1.1
// Host: server.example.com
// Upgrade: websocket
// Connection: Upgrade
// 
// Server responds:
// HTTP/1.1 101 Switching Protocols
// Upgrade: websocket
// Connection: Upgrade
// 
// After 101 response, the connection is no longer HTTP
// It's now speaking the new protocol (e.g., WebSocket)

HTTP upgrades allow switching from HTTP to another protocol on the same connection.

The on_upgrade Method

use hyper::{Body, Response};
use hyper::upgrade::Upgraded;
 
async fn handle_upgrade_example(mut response: Response<Body>) {
    // on_upgrade() returns an Option<OnUpgrade>
    // It's Some only if the response has an Upgrade header
    
    let upgrade = response.body_mut().on_upgrade();
    
    if let Some(on_upgrade) = upgrade {
        // This future resolves when the upgrade is complete
        let upgraded = on_upgrade.await;
        
        match upgraded {
            Ok(upgraded) => {
                // upgraded is a Upgraded type
                // It contains the underlying IO (TcpStream, etc.)
                // You can now use it for the new protocol
            }
            Err(e) => {
                eprintln!("Upgrade failed: {}", e);
            }
        }
    }
}

on_upgrade returns an Option<OnUpgrade>Some only for upgrade responses.

The Upgraded Type

use hyper::upgrade::Upgraded;
use tokio::net::TcpStream;
 
async fn use_upgraded() {
    let upgraded: Upgraded = /* ... */;
    
    // Upgraded implements AsyncRead + AsyncWrite
    // It wraps the underlying IO
    
    // You can downcast to get the underlying TcpStream
    if let Some(tcp) = upgraded.downcast_ref::<TcpStream>() {
        // Direct access to the TcpStream
        let addr = tcp.peer_addr().unwrap();
        println!("Peer address: {}", addr);
    }
    
    // Or use it generically with AsyncReadExt/AsyncWriteExt
    use tokio::io::{AsyncReadExt, AsyncWriteExt};
    
    let mut buf = [0u8; 1024];
    let n = AsyncReadExt::read(&mut upgraded, &mut buf).await.unwrap();
    AsyncWriteExt::write(&mut &upgraded, &buf[..n]).await.unwrap();
}

The Upgraded type wraps the underlying IO and provides downcasting.

WebSocket Upgrade Example

use hyper::{Body, Request, Response, StatusCode};
use hyper::upgrade::Upgraded;
use tokio_tungstenite::WebSocketStream;
 
async fn websocket_upgrade_handler(
    req: Request<Body>
) -> Result<Response<Body>, hyper::Error> {
    
    // Check if this is a WebSocket upgrade request
    if !req.headers().contains_key("Upgrade") {
        return Ok(Response::new(Body::from("Not a WebSocket request")));
    }
    
    // Create a 101 Switching Protocols response
    let mut response = Response::builder()
        .status(StatusCode::SWITCHING_PROTOCOLS)
        .body(Body::empty())?;
    
    // Add required headers
    response.headers_mut().insert(
        "Upgrade",
        "websocket".parse().unwrap(),
    );
    response.headers_mut().insert(
        "Connection",
        "Upgrade".parse().unwrap(),
    );
    
    // Get the upgrade future before sending response
    let upgrade = response.body_mut().on_upgrade().unwrap();
    
    // Spawn a task to handle the upgraded connection
    tokio::spawn(async move {
        match upgrade.await {
            Ok(upgraded) => {
                // Convert to WebSocket
                let ws_stream = WebSocketStream::from_raw_socket(
                    upgraded,
                    tokio_tungstenite::tungstenite::protocol::Role::Server,
                    None,
                ).await;
                
                // Handle WebSocket messages
                handle_websocket(ws_stream).await;
            }
            Err(e) => {
                eprintln!("Upgrade failed: {}", e);
            }
        }
    });
    
    Ok(response)
}
 
async fn handle_websocket(ws_stream: WebSocketStream<Upgraded>) {
    use tokio_tungstenite::tungstenite::Message;
    
    while let Some(msg) = ws_stream.next().await {
        match msg {
            Ok(Message::Text(text)) => {
                // Echo back
                ws_stream.send(Message::Text(text)).await.ok();
            }
            Ok(Message::Binary(data)) => {
                ws_stream.send(Message::Binary(data)).await.ok();
            }
            Ok(Message::Close(_)) => break,
            Err(e) => {
                eprintln!("WebSocket error: {}", e);
                break;
            }
            _ => {}
        }
    }
}

WebSocket is the most common use case for HTTP upgrades.

The Body is Consumed

use hyper::{Body, Response};
 
async fn body_consumed_example() {
    let mut response: Response<Body> = Response::builder()
        .status(101)
        .body(Body::empty())?;
    
    // on_upgrade takes ownership of the connection
    let upgrade = response.body_mut().on_upgrade().unwrap();
    
    // After calling on_upgrade, the body is marked for upgrade
    // You CANNOT read from the body anymore
    
    // This would fail or return empty:
    let bytes = hyper::body::to_bytes(response.body_mut()).await;
    // The body is no longer usable for HTTP response data
    
    // The HTTP layer is "done" - the connection is now speaking
    // a different protocol
}

Once on_upgrade is called, the body is for upgrade, not HTTP response data.

Server-Side Upgrade Handling

use hyper::{Body, Request, Response, StatusCode};
use hyper::server::Server;
use hyper::service::{make_service_fn, service_fn};
 
async fn upgrade_server() {
    let addr = ([127, 0, 0, 1], 3000).into();
    
    let server = Server::bind(&addr).serve(make_service_fn(|_conn| async {
        Ok::<_, hyper::Error>(service_fn(handle_request))
    }));
    
    server.await.unwrap();
}
 
async fn handle_request(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
    // Check for Upgrade header in request
    let wants_upgrade = req.headers()
        .get("Upgrade")
        .map(|v| v == "websocket")
        .unwrap_or(false);
    
    if wants_upgrade {
        // Create upgrade response
        let mut response = Response::builder()
            .status(StatusCode::SWITCHING_PROTOCOLS)
            .body(Body::empty())?;
        
        response.headers_mut().insert("Upgrade", "websocket".parse().unwrap());
        response.headers_mut().insert("Connection", "Upgrade".parse().unwrap());
        
        // Take the upgrade future
        let upgrade = response.body_mut().on_upgrade().unwrap();
        
        // Spawn handler
        tokio::spawn(async move {
            if let Ok(upgraded) = upgrade.await {
                handle_upgraded_connection(upgraded).await;
            }
        });
        
        Ok(response)
    } else {
        // Normal HTTP request
        Ok(Response::new(Body::from("Hello HTTP")))
    }
}
 
async fn handle_upgraded_connection(upgraded: hyper::upgrade::Upgraded) {
    // Use upgraded for custom protocol
    use tokio::io::{AsyncReadExt, AsyncWriteExt};
    
    let mut upgraded = upgraded;
    let mut buf = [0u8; 1024];
    
    loop {
        let n = match upgraded.read(&mut buf).await {
            Ok(n) if n > 0 => n,
            _ => break,
        };
        
        // Echo back
        if upgraded.write_all(&buf[..n]).await.is_err() {
            break;
        }
    }
}

Server-side upgrade handling requires checking headers and spawning handlers.

Client-Side Upgrade

use hyper::{Body, Client, Request};
use hyper::client::HttpConnector;
 
async fn client_upgrade() {
    let client: Client<HttpConnector> = Client::new();
    
    // Build an upgrade request
    let request = Request::builder()
        .method("GET")
        .uri("http://localhost:3000/chat")
        .header("Upgrade", "websocket")
        .header("Connection", "Upgrade")
        .body(Body::empty())?;
    
    // Send the request
    let response = client.request(request).await?;
    
    // Check if upgrade was accepted
    if response.status() == StatusCode::SWITCHING_PROTOCOLS {
        // Get the upgraded connection
        if let Some(upgrade) = response.body_mut().on_upgrade() {
            let upgraded = upgrade.await?;
            
            // Now use upgraded for WebSocket or other protocol
            handle_client_upgraded(upgraded).await;
        }
    }
}

Clients can also initiate upgrades using the same mechanism.

The on_upgrade Timing

use hyper::{Body, Response};
 
async fn upgrade_timing() {
    // Create a 101 response
    let mut response = Response::builder()
        .status(101)
        .body(Body::empty())?;
    
    // STEP 1: Get the upgrade future
    // This marks the body as "upgrade mode"
    let upgrade = response.body_mut().on_upgrade().unwrap();
    
    // STEP 2: Return the response to hyper
    // Hyper sends the 101 response headers
    
    // STEP 3: The upgrade future resolves
    // This happens AFTER hyper sends response headers
    // and BEFORE you can use the upgraded connection
    
    // The sequence is:
    // 1. Client sends upgrade request
    // 2. Server creates 101 response
    // 3. Server calls on_upgrade() to get future
    // 4. Server returns response (or sends it)
    // 5. Hyper sends 101 headers
    // 6. Upgrade future resolves
    // 7. Server can now use upgraded connection
    
    let upgraded = upgrade.await.unwrap();
    // NOW you can use upgraded
}

The upgrade future resolves after response headers are sent.

Comparison: on_upgrade vs Regular Body

use hyper::{Body, Response};
 
async fn compare_usage() {
    // NORMAL RESPONSE: Read body data
    let normal_response: Response<Body> = Response::new(Body::from("Hello"));
    
    // Body contains data
    let bytes = hyper::body::to_bytes(normal_response.into_body()).await;
    println!("Body: {:?}", bytes);
    
    // UPGRADE RESPONSE: Get upgraded connection
    let mut upgrade_response: Response<Body> = Response::builder()
        .status(101)
        .body(Body::empty())?;
    
    // Body is for upgrade, not data
    // This returns None for normal responses
    let maybe_upgrade = upgrade_response.body_mut().on_upgrade();
    
    if let Some(upgrade) = maybe_upgrade {
        // The response is upgrading the connection
        let upgraded = upgrade.await?;
        
        // Use upgraded for new protocol
        // Cannot read HTTP response body - there is none
    } else {
        // No upgrade - normal response
        // Body contains data
    }
}

on_upgrade returns None for normal responses; Some for upgrade responses.

Handling Multiple Upgrade Protocols

use hyper::{Body, Request, Response, StatusCode};
 
async fn handle_multiple_protocols(req: Request<Body>) -> Response<Body> {
    let upgrade_header = req.headers().get("Upgrade");
    
    match upgrade_header.and_then(|v| v.to_str().ok()) {
        Some("websocket") => {
            handle_websocket_upgrade(req)
        }
        Some("h2c") => {
            // HTTP/2 cleartext upgrade
            handle_h2c_upgrade(req)
        }
        Some(custom) => {
            handle_custom_upgrade(req, custom)
        }
        None => {
            Response::new(Body::from("No upgrade requested"))
        }
    }
}
 
fn handle_websocket_upgrade(req: Request<Body>) -> Response<Body> {
    let mut response = Response::builder()
        .status(StatusCode::SWITCHING_PROTOCOLS)
        .body(Body::empty())?;
    
    response.headers_mut().insert("Upgrade", "websocket".parse().unwrap());
    response.headers_mut().insert("Connection", "Upgrade".parse().unwrap());
    
    // Add WebSocket-specific headers
    if let Some(key) = req.headers().get("Sec-WebSocket-Key") {
        // Compute Sec-WebSocket-Accept
        // response.headers_mut().insert(...)
    }
    
    response
}
 
fn handle_custom_upgrade(req: Request<Body>, protocol: &str) -> Response<Body> {
    let mut response = Response::builder()
        .status(StatusCode::SWITCHING_PROTOCOLS)
        .body(Body::empty())?;
    
    response.headers_mut().insert("Upgrade", protocol.parse().unwrap());
    response.headers_mut().insert("Connection", "Upgrade".parse().unwrap());
    
    response
}

Servers can handle different upgrade protocols based on the Upgrade header.

Error Handling

use hyper::{Body, Response};
use hyper::upgrade::Upgraded;
 
async fn handle_upgrade_errors() {
    let mut response: Response<Body> = Response::builder()
        .status(101)
        .body(Body::empty())?;
    
    let upgrade = response.body_mut().on_upgrade().unwrap();
    
    match upgrade.await {
        Ok(upgraded) => {
            // Successfully upgraded
            use_upgraded_connection(upgraded).await;
        }
        Err(e) => {
            // Upgrade failed
            // Possible causes:
            // - Client disconnected before upgrade completed
            // - Network error during upgrade
            // - Timeout
            
            eprintln!("Upgrade error: {}", e);
            
            // The connection is now in an undefined state
            // It should be closed
        }
    }
}
 
async fn use_upgraded_connection(upgraded: Upgraded) {
    use tokio::io::{AsyncReadExt, AsyncWriteExt};
    
    let mut io = upgraded;
    
    // The connection is no longer HTTP
    // You must handle protocol-specific errors
    
    let mut buf = [0u8; 1024];
    loop {
        match io.read(&mut buf).await {
            Ok(0) => {
                println!("Connection closed");
                break;
            }
            Ok(n) => {
                if io.write_all(&buf[..n]).await.is_err() {
                    break;
                }
            }
            Err(e) => {
                eprintln!("Read error: {}", e);
                break;
            }
        }
    }
}

Upgrade errors should be handled—the connection may be in an undefined state.

CONNECT Method vs Upgrade

use hyper::{Body, Method, Request, Response, StatusCode};
 
async fn connect_vs_upgrade() {
    // CONNECT is different from Upgrade:
    
    // UPGRADE: Switch protocols on existing connection
    // Client: "Upgrade this connection to WebSocket"
    // Server: "101 Switching Protocols" + then WebSocket
    
    // CONNECT: Create tunnel through proxy
    // Client: "CONNECT server.example.com:443"
    // Proxy: "200 Connection established" + then raw TCP tunnel
    
    // CONNECT is used for proxying, Upgrade is used for
    // protocol switching on the same endpoint
}
 
async fn handle_connect(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
    if req.method() == Method::CONNECT {
        // CONNECT request - create tunnel
        // The response body can be used for tunneling
        
        let mut response = Response::builder()
            .status(StatusCode::OK)
            .body(Body::empty())?;
        
        // For CONNECT, use on_upgrade to get the tunnel
        let upgrade = response.body_mut().on_upgrade().unwrap();
        
        tokio::spawn(async move {
            if let Ok(upgraded) = upgrade.await {
                // Tunnel data between client and target
                handle_tunnel(upgraded).await;
            }
        });
        
        Ok(response)
    } else if req.headers().contains_key("Upgrade") {
        // Upgrade request - switch protocols
        handle_upgrade(req).await
    } else {
        // Normal request
        Ok(Response::new(Body::from("Normal response")))
    }
}

CONNECT and Upgrade are different mechanisms with different purposes.

Real-World WebSocket Server

use hyper::{Body, Request, Response, StatusCode, header};
use hyper::server::Server;
use hyper::service::{make_service_fn, service_fn};
use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::tungstenite::protocol::Role;
 
type Error = Box<dyn std::error::Error + Send + Sync>;
 
async fn ws_server() {
    let addr = ([127, 0, 0, 1], 8080).into();
    
    let server = Server::bind(&addr)
        .serve(make_service_fn(|_conn| async {
            Ok::<_, Error>(service_fn(handle))
        }));
    
    println!("WebSocket server listening on {}", addr);
    
    if let Err(e) = server.await {
        eprintln!("Server error: {}", e);
    }
}
 
async fn handle(req: Request<Body>) -> Result<Response<Body>, Error> {
    // Check for WebSocket upgrade
    let upgrade = req.headers()
        .get(header::UPGRADE)
        .and_then(|v| v.to_str().ok())
        .map(|v| v.to_lowercase());
    
    if upgrade.as_deref() == Some("websocket") {
        // Validate WebSocket headers
        let key = req.headers()
            .get("Sec-WebSocket-Key")
            .ok_or("Missing Sec-WebSocket-Key")?;
        
        // Create 101 response
        let mut response = Response::builder()
            .status(StatusCode::SWITCHING_PROTOCOLS)
            .header(header::UPGRADE, "websocket")
            .header(header::CONNECTION, "Upgrade")
            .header("Sec-WebSocket-Accept", compute_accept_key(key.as_bytes()))
            .body(Body::empty())?;
        
        // Take upgrade future
        let upgrade = response.body_mut().on_upgrade().unwrap();
        
        // Spawn WebSocket handler
        tokio::spawn(async move {
            if let Ok(upgraded) = upgrade.await {
                let ws = WebSocketStream::from_raw_socket(
                    upgraded,
                    Role::Server,
                    None,
                ).await;
                
                if let Err(e) = handle_websocket_connection(ws).await {
                    eprintln!("WebSocket error: {}", e);
                }
            }
        });
        
        Ok(response)
    } else {
        // Return HTML page with WebSocket client
        Ok(Response::new(Body::from(HTML_PAGE)))
    }
}
 
async fn handle_websocket_connection(
    ws: WebSocketStream<hyper::upgrade::Upgraded>
) -> Result<(), Error> {
    use tokio_tungstenite::tungstenite::Message;
    use futures_util::StreamExt;
    
    while let Some(msg) = ws.next().await {
        match msg? {
            Message::Text(text) => {
                ws.send(Message::Text(format!("Echo: {}", text))).await?;
            }
            Message::Binary(data) => {
                ws.send(Message::Binary(data)).await?;
            }
            Message::Close(_) => {
                ws.close(None).await?;
                break;
            }
            _ => {}
        }
    }
    
    Ok(())
}
 
fn compute_accept_key(key: &[u8]) -> String {
    use sha1::{Digest, Sha1};
    use base64::Engine;
    
    let mut hasher = Sha1::new();
    hasher.update(key);
    hasher.update(b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11");
    
    base64::engine::general_purpose::STANDARD.encode(hasher.finalize())
}
 
const HTML_PAGE: &str = r#"
<!DOCTYPE html>
<html>
<body>
<script>
const ws = new WebSocket("ws://localhost:8080");
ws.onmessage = (e) => console.log("Received:", e.data);
ws.onopen = () => ws.send("Hello!");
</script>
</body>
</html>
"#;

A complete WebSocket server using on_upgrade.

Summary Table

fn summary() {
    // | Aspect               | Normal Response      | Upgrade Response        |
    // |----------------------|----------------------|-------------------------|
    // | Status               | 200, 404, etc.       | 101 Switching Protocols |
    // | Body                 | Contains data        | Empty or ignored        |
    // | on_upgrade()         | Returns None         | Returns Some(OnUpgrade) |
    // | After response       | Connection can be reused | Protocol switched   |
    // | Body usage           | Read body data       | Cannot read, use upgrade|
    
    // on_upgrade() returns:
    // - None for normal responses (no upgrade)
    // - Some(OnUpgrade) for 101 responses
    
    // After upgrade:
    // - HTTP is done on this connection
    // - Use Upgraded for custom protocol
    // - Cannot go back to HTTP
    
    // Common upgrade protocols:
    // - WebSocket (most common)
    // - HTTP/2 cleartext (h2c)
    // - Custom protocols
}

Synthesis

Quick reference:

use hyper::{Body, Response, StatusCode};
 
// Create upgrade response
let mut response = Response::builder()
    .status(StatusCode::SWITCHING_PROTOCOLS)
    .header("Upgrade", "websocket")
    .header("Connection", "Upgrade")
    .body(Body::empty())?;
 
// Get upgrade future (returns None for normal responses)
if let Some(upgrade) = response.body_mut().on_upgrade() {
    tokio::spawn(async move {
        match upgrade.await {
            Ok(upgraded) => {
                // Connection is now upgraded
                // Use upgraded for WebSocket, custom protocol, etc.
            }
            Err(e) => {
                eprintln!("Upgrade failed: {}", e);
            }
        }
    });
}
 
// Return response (hyper sends 101 headers, then upgrade resolves)
Ok(response)

Key insight: Body::on_upgrade is the bridge between HTTP and other protocols on the same TCP connection. When a server sends a 101 Switching Protocols response, it's saying "HTTP is done here, let's speak something else." The on_upgrade future resolves precisely when this transition is complete—after the response headers are sent but before you start speaking the new protocol. The returned Upgraded type wraps the underlying IO (typically TcpStream) and gives you direct access to it, no longer mediated by HTTP. This is fundamentally different from a normal response where you read the body for HTTP data; with an upgrade, the body is not for data at all—it's a marker that says "this connection is being hijacked." The critical constraint is exclusivity: once you call on_upgrade, you cannot use the body for HTTP response data because the connection will no longer be speaking HTTP. This is why on_upgrade returns an Option—it's None for normal responses where you'd read the body normally, and Some only when the connection is transitioning to another protocol. WebSocket is the dominant use case, but any protocol switch uses the same mechanism: client sends Upgrade: websocket, server responds 101 Switching Protocols, both parties call on_upgrade and then speak WebSocket instead of HTTP.