What is the purpose of futures::sink::Sink::send for combining feed and flush operations?

send is a convenience method that calls feed to enqueue an item and then flush to ensure the item is transmitted, handling the asynchronous coordination of both operations in a single call. This simplifies the common pattern of writing to a sink and immediately flushing, ensuring the item is fully processed before continuing, while abstracting away the manual state management required when using feed and flush separately.

The Sink Trait Basics

use futures::sink::Sink;
use std::pin::Pin;
use std::task::{Context, Poll};
 
// The Sink trait (simplified):
// pub trait Sink<Item> {
//     type Error;
//     
//     fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) 
//         -> Poll<Result<(), Self::Error>>;
//     fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>;
//     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) 
//         -> Poll<Result<(), Self::Error>>;
//     fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) 
//         -> Poll<Result<(), Self::Error>>;
// }
 
// Key methods:
// - poll_ready: Check if the sink can accept an item
// - start_send: Queue an item for sending
// - poll_flush: Flush pending items to the underlying destination
// - poll_close: Flush and close the sink

Sink is the async equivalent of std::io::Write for asynchronous streams.

Feed vs Send

use futures::sink::SinkExt;
use futures::stream::StreamExt;
 
async fn feed_vs_send() -> Result<(), Box<dyn std::error::Error>> {
    // feed: Enqueues an item but may not flush immediately
    // send: Enqueues an item AND flushes
    
    let mut sink = get_sink();
    
    // Using feed (just enqueues):
    sink.feed(item).await?;  // Enqueues item
    // Item may not be transmitted yet!
    // Need to flush separately:
    sink.flush().await?;  // Now the item is transmitted
    
    // Using send (combines both):
    sink.send(item).await?;  // Enqueues AND flushes
    // Item is guaranteed to be transmitted
    
    Ok(())
}
 
# async fn get_sink() -> futures::channel::mpsc::Sender<i32> {
#     futures::channel::mpsc::channel(16).0
# }
# let item = 42;

feed only enqueues; send enqueues and flushes.

The Feed Operation

use futures::sink::SinkExt;
 
async fn feed_example() -> Result<(), Box<dyn std::error::Error>> {
    let mut sink = get_sink();
    
    // feed enqueues an item for sending
    // It may return immediately without flushing
    sink.feed(1).await?;
    sink.feed(2).await?;
    sink.feed(3).await?;
    
    // Items are queued but may not be sent yet
    // The sink may buffer for efficiency
    
    // Explicit flush to ensure transmission
    sink.flush().await?;
    
    // Now all items are guaranteed to be sent
    
    Ok(())
}
 
# async fn get_sink() -> futures::channel::mpsc::Sender<i32> {
#     futures::channel::mpsc::channel(16).0
# }

feed is useful for batching multiple items before flushing.

The Send Operation

use futures::sink::SinkExt;
 
async fn send_example() -> Result<(), Box<dyn std::error::Error>> {
    let mut sink = get_sink();
    
    // send combines feed + flush
    // Each call ensures the item is transmitted before returning
    sink.send(1).await?;  // Item 1 is transmitted
    sink.send(2).await?;  // Item 2 is transmitted
    sink.send(3).await?;  // Item 3 is transmitted
    
    // Each send waits for flush to complete
    // More predictable but potentially less efficient
    
    Ok(())
}
 
# async fn get_sink() -> futures::channel::mpsc::Sender<i32> {
#     futures::channel::mpsc::channel(16).0
# }

send ensures each item is flushed before continuing.

Internal Implementation

use futures::sink::SinkExt;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
 
// Simplified implementation of send:
// async fn send(&mut self, item: Item) -> Result<(), Self::Error> {
//     self.feed(item).await?;  // Enqueue the item
//     self.flush().await?;     // Flush to ensure transmission
//     Ok(())
// }
 
// More detailed expansion:
async fn send_explicit<S, Item>(sink: &mut S, item: Item) -> Result<(), S::Error>
where
    S: Sink<Item> + Unpin,
{
    // Step 1: Feed the item
    sink.feed(item).await?;
    
    // Step 2: Flush to ensure transmission
    sink.flush().await?;
    
    Ok(())
}
 
// send is essentially feed followed by flush
// But composed into a single future for convenience

send is feed followed by flush, composed into one operation.

When to Use Feed

use futures::sink::SinkExt;
 
async fn batching_with_feed() -> Result<(), Box<dyn std::error::Error>> {
    let mut sink = get_sink();
    
    // When sending multiple items, feed is more efficient:
    
    // Option 1: Using send (multiple flushes)
    for item in 0..100 {
        sink.send(item).await?;  // Flushes after EACH item
    }
    // 100 flushes!
    
    // Option 2: Using feed (single flush)
    for item in 0..100 {
        sink.feed(item).await?;  // Just enqueues
    }
    sink.flush().await?;  // Single flush at the end
    // 1 flush total
    
    // Option 2 is more efficient for batch operations
    
    Ok(())
}
 
# async fn get_sink() -> futures::channel::mpsc::Sender<i32> {
#     futures::channel::mpsc::channel(16).0
# }

Use feed for batching multiple items; send for individual items.

When to Use Send

use futures::sink::SinkExt;
 
async fn send_use_cases() -> Result<(), Box<dyn std::error::Error>> {
    let mut sink = get_sink();
    
    // Use send when:
    
    // 1. You need to ensure an item is transmitted before continuing
    sink.send(important_message()).await?;
    // Important: message is definitely sent before next line
    
    // 2. Individual items need immediate delivery
    sink.send(status_update()).await?;
    
    // 3. Error handling requires knowing if specific item was sent
    if let Err(e) = sink.send(item).await {
        // This specific item failed to send
        handle_error(e);
    }
    
    // 4. Single-item operations
    sink.send(final_item()).await?;
    sink.close().await?;  // Close after final item
    
    Ok(())
}
 
# async fn get_sink() -> futures::channel::mpsc::Sender<i32> {
#     futures::channel::mpsc::channel(16).0
# }
# fn important_message() -> i32 { 1 }
# fn status_update() -> i32 { 2 }
# let item = 3;
# fn final_item() -> i32 { 4 }
# fn handle_error<T>(_: T) {}

Use send when you need immediate delivery confirmation.

Combining Feed and Send

use futures::sink::SinkExt;
 
async fn mixed_operations() -> Result<(), Box<dyn std::error::Error>> {
    let mut sink = get_sink();
    
    // Batch some items with feed
    for item in 0..10 {
        sink.feed(item).await?;
    }
    
    // Send a critical item immediately
    sink.send(critical_item()).await?;  // Flushes batch + critical item
    
    // Continue with more batched items
    for item in 10..20 {
        sink.feed(item).await?;
    }
    
    // Final flush
    sink.flush().await?;
    
    // Note: send flushes everything pending, not just its item
    // The critical_item flush includes items 0-9 as well
    
    Ok(())
}
 
# async fn get_sink() -> futures::channel::mpsc::Sender<i32> {
#     futures::channel::mpsc::channel(16).0
# }
# fn critical_item() -> i32 { 999 }

send flushes all pending items, including those enqueued with feed.

Buffered vs Unbuffered Sinks

use futures::sink::SinkExt;
 
async fn buffered_sinks() -> Result<(), Box<dyn std::error::Error>> {
    // Buffered sinks (like mpsc::Sender):
    // - Have an internal buffer
    // - feed may return immediately if buffer has space
    // - flush forces buffer to drain
    
    let (mut tx, _rx) = futures::channel::mpsc::channel(16);
    
    // feed fills the buffer
    for i in 0..16 {
        tx.feed(i).await?;  // Returns immediately (buffer has space)
    }
    
    // Buffer is full; next feed will wait
    // Or use flush to ensure transmission
    
    // Unbuffered sinks (like some network sinks):
    // - May not have internal buffers
    // - feed still queues, but flush is more important
    // - send ensures immediate transmission
    
    Ok(())
}

Buffered sinks benefit more from batching with feed.

Error Handling Differences

use futures::sink::SinkExt;
 
async fn error_handling() -> Result<(), Box<dyn std::error::Error>> {
    let mut sink = get_sink();
    
    // send error handling
    match sink.send(item()).await {
        Ok(()) => {
            // Item was definitely sent and flushed
        }
        Err(e) => {
            // Item may or may not have been sent
            // - feed could have succeeded
            // - flush could have failed
            // The error could come from either operation
        }
    }
    
    // Separate feed + flush gives more control
    sink.feed(item()).await?;  // Error from feed
    sink.flush().await?;        // Error from flush
    
    // You know which operation failed
    
    Ok(())
}
 
# async fn get_sink() -> futures::channel::mpsc::Sender<i32> {
#     futures::channel::mpsc::channel(16).0
# }
# fn item() -> i32 { 42 }

Separate feed and flush allow distinguishing which operation failed.

Backpressure Handling

use futures::sink::SinkExt;
 
async fn backpressure() -> Result<(), Box<dyn std::error::Error>> {
    let (mut tx, _rx) = futures::channel::mpsc::channel(2);  // Small buffer
    
    // send respects backpressure
    // It will wait if the sink is full
    tx.send(1).await?;  // Buffer: [1]
    tx.send(2).await?;  // Buffer: [1, 2]
    // tx.send(3).await?;  // Would wait until buffer has space
    
    // feed also respects backpressure
    // It will wait if the sink can't accept more items
    tx.feed(3).await?;  // Waits for space (if buffer full)
    
    // The difference is when flush happens:
    // - send: waits for flush to complete
    // - feed: waits for space, returns after enqueue
    
    Ok(())
}

Both send and feed respect backpressure; send also waits for flush.

Complete Example: Network Protocol

use futures::sink::SinkExt;
use futures::stream::StreamExt;
 
async fn protocol_example() -> Result<(), Box<dyn std::error::Error>> {
    let mut connection = get_connection();
    
    // Protocol message types
    enum Message {
        Header { version: u8, flags: u8 },
        Data(Vec<u8>),
        Trailer { checksum: u32 },
    }
    
    // Sending a structured message efficiently:
    
    // Batch header and data with feed
    connection.feed(Message::Header { version: 1, flags: 0 }).await?;
    connection.feed(Message::Data(b"payload".to_vec())).await?;
    
    // Send trailer with flush (ensures complete transmission)
    connection.send(Message::Trailer { checksum: 0x12345678 }).await?;
    
    // The complete message is now sent
    // Using send for the final piece ensures flush
    
    Ok(())
}
 
# async fn get_connection() -> futures::channel::mpsc::Sender<Message> {
#     futures::channel::mpsc::channel(16).0
# }
# enum Message {
#     Header { version: u8, flags: u8 },
#     Data(Vec<u8>),
#     Trailer { checksum: u32 },
# }

Combine feed for batch items and send for final items.

Flush Semantics

use futures::sink::SinkExt;
 
async fn flush_semantics() -> Result<(), Box<dyn std::error::Error>> {
    let mut sink = get_sink();
    
    // What flush does:
    // 1. Ensures all queued items are sent
    // 2. Waits for underlying I/O to complete
    // 3. Returns when transmission is confirmed
    
    sink.feed(1).await?;
    sink.feed(2).await?;
    
    // At this point, items may still be in internal buffers
    // They haven't necessarily reached the destination
    
    sink.flush().await?;
    
    // Now items are definitely transmitted
    // Buffers are drained, I/O is complete
    
    // send does all of this:
    // 1. feed(item) - queue the item
    // 2. flush() - ensure everything is sent
    
    sink.send(3).await?;  // Item 3 is definitely sent
    
    Ok(())
}
 
# async fn get_sink() -> futures::channel::mpsc::Sender<i32> {
#     futures::channel::mpsc::channel(16).0
# }

flush ensures all buffered data is transmitted before returning.

Closing the Sink

use futures::sink::SinkExt;
 
async fn close_sink() -> Result<(), Box<dyn std::error::Error>> {
    let mut sink = get_sink();
    
    // close combines flush + close
    // It's like send but for closing the sink
    
    sink.feed(1).await?;
    sink.feed(2).await?;
    
    // close flushes remaining items and closes
    sink.close().await?;
    
    // Equivalent to:
    // sink.flush().await?;
    // (sink can no longer be used)
    
    // Note: After close, the sink cannot accept new items
    
    Ok(())
}
 
# async fn get_sink() -> futures::channel::mpsc::Sender<i32> {
#     futures::channel::mpsc::channel(16).0
# }

close flushes pending items and signals completion.

Practical Pattern: Batch then Send

use futures::sink::SinkExt;
 
async fn batch_pattern() -> Result<(), Box<dyn std::error::Error>> {
    let mut sink = get_sink();
    
    // Pattern: Batch with feed, finalize with send
    let items = vec![1, 2, 3, 4, 5];
    
    // Batch all but last
    for item in &items[..items.len() - 1] {
        sink.feed(*item).await?;
    }
    
    // Send last item, which flushes everything
    sink.send(*items.last().unwrap()).await?;
    
    // This pattern:
    // - Minimizes flushes (only one)
    // - Ensures all items are sent (via send's flush)
    
    Ok(())
}
 
# async fn get_sink() -> futures::channel::mpsc::Sender<i32> {
#     futures::channel::mpsc::channel(16).0
# }

Batch with feed, finalize with send for efficient delivery.

Synthesis

Quick reference:

Method Behavior When to Use
feed(item) Enqueues item, may not flush Batching multiple items
flush() Drains all queued items After batch of feeds
send(item) feed(item) + flush() Single items, immediate delivery
close() flush() + signal completion Final item, closing connection

Common patterns:

use futures::sink::SinkExt;
 
async fn patterns() -> Result<(), Box<dyn std::error::Error>> {
    let mut sink = get_sink();
    
    // Pattern 1: Single important item
    sink.send(important_item()).await?;
    
    // Pattern 2: Batch items efficiently
    for item in items {
        sink.feed(item).await?;
    }
    sink.flush().await?;
    
    // Pattern 3: Batch with send as final item
    for item in items.iter().take(items.len() - 1) {
        sink.feed(*item).await?;
    }
    sink.send(*items.last().unwrap()).await?;
    
    // Pattern 4: Stream to sink
    while let Some(item) = stream.next().await {
        sink.send(item).await?;  // Each item flushed immediately
    }
    
    Ok(())
}
 
# async fn get_sink() -> futures::channel::mpsc::Sender<i32> {
#     futures::channel::mpsc::channel(16).0
# }
# fn important_item() -> i32 { 42 }
# let items = vec![1, 2, 3];
# let mut stream = futures::stream::iter(vec![1i32, 2, 3]);
# use futures::stream::StreamExt;

Key insight: Sink::send is a convenience method that combines feed and flush into a single operation. feed(item) enqueues an item for sending but may return before the item is transmitted—it queues the item in the sink's internal buffer for efficiency. flush() forces the sink to transmit all queued items and wait for completion. send(item) does both: it calls feed(item) followed by flush(), ensuring the item is enqueued and transmitted before returning. The distinction matters for batching: using send in a loop flushes after each item (multiple flushes), while using feed in a loop followed by a single flush batches items for efficiency (one flush). Use send when you need to ensure an individual item is delivered before continuing (important messages, single-item operations, or when ordering guarantees matter). Use feed + flush when you have multiple items to send and can batch them for efficiency. send is the right default choice for simplicity; feed is the optimization for batch operations. Note that send flushes everything pending, not just its own item—if you feed multiple items and then send one more, all queued items are flushed together.