What is the purpose of futures::sink::Sink::send for combining feed and flush operations?
send is a convenience method that calls feed to enqueue an item and then flush to ensure the item is transmitted, handling the asynchronous coordination of both operations in a single call. This simplifies the common pattern of writing to a sink and immediately flushing, ensuring the item is fully processed before continuing, while abstracting away the manual state management required when using feed and flush separately.
The Sink Trait Basics
use futures::sink::Sink;
use std::pin::Pin;
use std::task::{Context, Poll};
// The Sink trait (simplified):
// pub trait Sink<Item> {
// type Error;
//
// fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>)
// -> Poll<Result<(), Self::Error>>;
// fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>;
// fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>)
// -> Poll<Result<(), Self::Error>>;
// fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>)
// -> Poll<Result<(), Self::Error>>;
// }
// Key methods:
// - poll_ready: Check if the sink can accept an item
// - start_send: Queue an item for sending
// - poll_flush: Flush pending items to the underlying destination
// - poll_close: Flush and close the sinkSink is the async equivalent of std::io::Write for asynchronous streams.
Feed vs Send
use futures::sink::SinkExt;
use futures::stream::StreamExt;
async fn feed_vs_send() -> Result<(), Box<dyn std::error::Error>> {
// feed: Enqueues an item but may not flush immediately
// send: Enqueues an item AND flushes
let mut sink = get_sink();
// Using feed (just enqueues):
sink.feed(item).await?; // Enqueues item
// Item may not be transmitted yet!
// Need to flush separately:
sink.flush().await?; // Now the item is transmitted
// Using send (combines both):
sink.send(item).await?; // Enqueues AND flushes
// Item is guaranteed to be transmitted
Ok(())
}
# async fn get_sink() -> futures::channel::mpsc::Sender<i32> {
# futures::channel::mpsc::channel(16).0
# }
# let item = 42;feed only enqueues; send enqueues and flushes.
The Feed Operation
use futures::sink::SinkExt;
async fn feed_example() -> Result<(), Box<dyn std::error::Error>> {
let mut sink = get_sink();
// feed enqueues an item for sending
// It may return immediately without flushing
sink.feed(1).await?;
sink.feed(2).await?;
sink.feed(3).await?;
// Items are queued but may not be sent yet
// The sink may buffer for efficiency
// Explicit flush to ensure transmission
sink.flush().await?;
// Now all items are guaranteed to be sent
Ok(())
}
# async fn get_sink() -> futures::channel::mpsc::Sender<i32> {
# futures::channel::mpsc::channel(16).0
# }feed is useful for batching multiple items before flushing.
The Send Operation
use futures::sink::SinkExt;
async fn send_example() -> Result<(), Box<dyn std::error::Error>> {
let mut sink = get_sink();
// send combines feed + flush
// Each call ensures the item is transmitted before returning
sink.send(1).await?; // Item 1 is transmitted
sink.send(2).await?; // Item 2 is transmitted
sink.send(3).await?; // Item 3 is transmitted
// Each send waits for flush to complete
// More predictable but potentially less efficient
Ok(())
}
# async fn get_sink() -> futures::channel::mpsc::Sender<i32> {
# futures::channel::mpsc::channel(16).0
# }send ensures each item is flushed before continuing.
Internal Implementation
use futures::sink::SinkExt;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
// Simplified implementation of send:
// async fn send(&mut self, item: Item) -> Result<(), Self::Error> {
// self.feed(item).await?; // Enqueue the item
// self.flush().await?; // Flush to ensure transmission
// Ok(())
// }
// More detailed expansion:
async fn send_explicit<S, Item>(sink: &mut S, item: Item) -> Result<(), S::Error>
where
S: Sink<Item> + Unpin,
{
// Step 1: Feed the item
sink.feed(item).await?;
// Step 2: Flush to ensure transmission
sink.flush().await?;
Ok(())
}
// send is essentially feed followed by flush
// But composed into a single future for conveniencesend is feed followed by flush, composed into one operation.
When to Use Feed
use futures::sink::SinkExt;
async fn batching_with_feed() -> Result<(), Box<dyn std::error::Error>> {
let mut sink = get_sink();
// When sending multiple items, feed is more efficient:
// Option 1: Using send (multiple flushes)
for item in 0..100 {
sink.send(item).await?; // Flushes after EACH item
}
// 100 flushes!
// Option 2: Using feed (single flush)
for item in 0..100 {
sink.feed(item).await?; // Just enqueues
}
sink.flush().await?; // Single flush at the end
// 1 flush total
// Option 2 is more efficient for batch operations
Ok(())
}
# async fn get_sink() -> futures::channel::mpsc::Sender<i32> {
# futures::channel::mpsc::channel(16).0
# }Use feed for batching multiple items; send for individual items.
When to Use Send
use futures::sink::SinkExt;
async fn send_use_cases() -> Result<(), Box<dyn std::error::Error>> {
let mut sink = get_sink();
// Use send when:
// 1. You need to ensure an item is transmitted before continuing
sink.send(important_message()).await?;
// Important: message is definitely sent before next line
// 2. Individual items need immediate delivery
sink.send(status_update()).await?;
// 3. Error handling requires knowing if specific item was sent
if let Err(e) = sink.send(item).await {
// This specific item failed to send
handle_error(e);
}
// 4. Single-item operations
sink.send(final_item()).await?;
sink.close().await?; // Close after final item
Ok(())
}
# async fn get_sink() -> futures::channel::mpsc::Sender<i32> {
# futures::channel::mpsc::channel(16).0
# }
# fn important_message() -> i32 { 1 }
# fn status_update() -> i32 { 2 }
# let item = 3;
# fn final_item() -> i32 { 4 }
# fn handle_error<T>(_: T) {}Use send when you need immediate delivery confirmation.
Combining Feed and Send
use futures::sink::SinkExt;
async fn mixed_operations() -> Result<(), Box<dyn std::error::Error>> {
let mut sink = get_sink();
// Batch some items with feed
for item in 0..10 {
sink.feed(item).await?;
}
// Send a critical item immediately
sink.send(critical_item()).await?; // Flushes batch + critical item
// Continue with more batched items
for item in 10..20 {
sink.feed(item).await?;
}
// Final flush
sink.flush().await?;
// Note: send flushes everything pending, not just its item
// The critical_item flush includes items 0-9 as well
Ok(())
}
# async fn get_sink() -> futures::channel::mpsc::Sender<i32> {
# futures::channel::mpsc::channel(16).0
# }
# fn critical_item() -> i32 { 999 }send flushes all pending items, including those enqueued with feed.
Buffered vs Unbuffered Sinks
use futures::sink::SinkExt;
async fn buffered_sinks() -> Result<(), Box<dyn std::error::Error>> {
// Buffered sinks (like mpsc::Sender):
// - Have an internal buffer
// - feed may return immediately if buffer has space
// - flush forces buffer to drain
let (mut tx, _rx) = futures::channel::mpsc::channel(16);
// feed fills the buffer
for i in 0..16 {
tx.feed(i).await?; // Returns immediately (buffer has space)
}
// Buffer is full; next feed will wait
// Or use flush to ensure transmission
// Unbuffered sinks (like some network sinks):
// - May not have internal buffers
// - feed still queues, but flush is more important
// - send ensures immediate transmission
Ok(())
}Buffered sinks benefit more from batching with feed.
Error Handling Differences
use futures::sink::SinkExt;
async fn error_handling() -> Result<(), Box<dyn std::error::Error>> {
let mut sink = get_sink();
// send error handling
match sink.send(item()).await {
Ok(()) => {
// Item was definitely sent and flushed
}
Err(e) => {
// Item may or may not have been sent
// - feed could have succeeded
// - flush could have failed
// The error could come from either operation
}
}
// Separate feed + flush gives more control
sink.feed(item()).await?; // Error from feed
sink.flush().await?; // Error from flush
// You know which operation failed
Ok(())
}
# async fn get_sink() -> futures::channel::mpsc::Sender<i32> {
# futures::channel::mpsc::channel(16).0
# }
# fn item() -> i32 { 42 }Separate feed and flush allow distinguishing which operation failed.
Backpressure Handling
use futures::sink::SinkExt;
async fn backpressure() -> Result<(), Box<dyn std::error::Error>> {
let (mut tx, _rx) = futures::channel::mpsc::channel(2); // Small buffer
// send respects backpressure
// It will wait if the sink is full
tx.send(1).await?; // Buffer: [1]
tx.send(2).await?; // Buffer: [1, 2]
// tx.send(3).await?; // Would wait until buffer has space
// feed also respects backpressure
// It will wait if the sink can't accept more items
tx.feed(3).await?; // Waits for space (if buffer full)
// The difference is when flush happens:
// - send: waits for flush to complete
// - feed: waits for space, returns after enqueue
Ok(())
}Both send and feed respect backpressure; send also waits for flush.
Complete Example: Network Protocol
use futures::sink::SinkExt;
use futures::stream::StreamExt;
async fn protocol_example() -> Result<(), Box<dyn std::error::Error>> {
let mut connection = get_connection();
// Protocol message types
enum Message {
Header { version: u8, flags: u8 },
Data(Vec<u8>),
Trailer { checksum: u32 },
}
// Sending a structured message efficiently:
// Batch header and data with feed
connection.feed(Message::Header { version: 1, flags: 0 }).await?;
connection.feed(Message::Data(b"payload".to_vec())).await?;
// Send trailer with flush (ensures complete transmission)
connection.send(Message::Trailer { checksum: 0x12345678 }).await?;
// The complete message is now sent
// Using send for the final piece ensures flush
Ok(())
}
# async fn get_connection() -> futures::channel::mpsc::Sender<Message> {
# futures::channel::mpsc::channel(16).0
# }
# enum Message {
# Header { version: u8, flags: u8 },
# Data(Vec<u8>),
# Trailer { checksum: u32 },
# }Combine feed for batch items and send for final items.
Flush Semantics
use futures::sink::SinkExt;
async fn flush_semantics() -> Result<(), Box<dyn std::error::Error>> {
let mut sink = get_sink();
// What flush does:
// 1. Ensures all queued items are sent
// 2. Waits for underlying I/O to complete
// 3. Returns when transmission is confirmed
sink.feed(1).await?;
sink.feed(2).await?;
// At this point, items may still be in internal buffers
// They haven't necessarily reached the destination
sink.flush().await?;
// Now items are definitely transmitted
// Buffers are drained, I/O is complete
// send does all of this:
// 1. feed(item) - queue the item
// 2. flush() - ensure everything is sent
sink.send(3).await?; // Item 3 is definitely sent
Ok(())
}
# async fn get_sink() -> futures::channel::mpsc::Sender<i32> {
# futures::channel::mpsc::channel(16).0
# }flush ensures all buffered data is transmitted before returning.
Closing the Sink
use futures::sink::SinkExt;
async fn close_sink() -> Result<(), Box<dyn std::error::Error>> {
let mut sink = get_sink();
// close combines flush + close
// It's like send but for closing the sink
sink.feed(1).await?;
sink.feed(2).await?;
// close flushes remaining items and closes
sink.close().await?;
// Equivalent to:
// sink.flush().await?;
// (sink can no longer be used)
// Note: After close, the sink cannot accept new items
Ok(())
}
# async fn get_sink() -> futures::channel::mpsc::Sender<i32> {
# futures::channel::mpsc::channel(16).0
# }close flushes pending items and signals completion.
Practical Pattern: Batch then Send
use futures::sink::SinkExt;
async fn batch_pattern() -> Result<(), Box<dyn std::error::Error>> {
let mut sink = get_sink();
// Pattern: Batch with feed, finalize with send
let items = vec![1, 2, 3, 4, 5];
// Batch all but last
for item in &items[..items.len() - 1] {
sink.feed(*item).await?;
}
// Send last item, which flushes everything
sink.send(*items.last().unwrap()).await?;
// This pattern:
// - Minimizes flushes (only one)
// - Ensures all items are sent (via send's flush)
Ok(())
}
# async fn get_sink() -> futures::channel::mpsc::Sender<i32> {
# futures::channel::mpsc::channel(16).0
# }Batch with feed, finalize with send for efficient delivery.
Synthesis
Quick reference:
| Method | Behavior | When to Use |
|---|---|---|
feed(item) |
Enqueues item, may not flush | Batching multiple items |
flush() |
Drains all queued items | After batch of feeds |
send(item) |
feed(item) + flush() |
Single items, immediate delivery |
close() |
flush() + signal completion |
Final item, closing connection |
Common patterns:
use futures::sink::SinkExt;
async fn patterns() -> Result<(), Box<dyn std::error::Error>> {
let mut sink = get_sink();
// Pattern 1: Single important item
sink.send(important_item()).await?;
// Pattern 2: Batch items efficiently
for item in items {
sink.feed(item).await?;
}
sink.flush().await?;
// Pattern 3: Batch with send as final item
for item in items.iter().take(items.len() - 1) {
sink.feed(*item).await?;
}
sink.send(*items.last().unwrap()).await?;
// Pattern 4: Stream to sink
while let Some(item) = stream.next().await {
sink.send(item).await?; // Each item flushed immediately
}
Ok(())
}
# async fn get_sink() -> futures::channel::mpsc::Sender<i32> {
# futures::channel::mpsc::channel(16).0
# }
# fn important_item() -> i32 { 42 }
# let items = vec![1, 2, 3];
# let mut stream = futures::stream::iter(vec![1i32, 2, 3]);
# use futures::stream::StreamExt;Key insight: Sink::send is a convenience method that combines feed and flush into a single operation. feed(item) enqueues an item for sending but may return before the item is transmittedāit queues the item in the sink's internal buffer for efficiency. flush() forces the sink to transmit all queued items and wait for completion. send(item) does both: it calls feed(item) followed by flush(), ensuring the item is enqueued and transmitted before returning. The distinction matters for batching: using send in a loop flushes after each item (multiple flushes), while using feed in a loop followed by a single flush batches items for efficiency (one flush). Use send when you need to ensure an individual item is delivered before continuing (important messages, single-item operations, or when ordering guarantees matter). Use feed + flush when you have multiple items to send and can batch them for efficiency. send is the right default choice for simplicity; feed is the optimization for batch operations. Note that send flushes everything pending, not just its own itemāif you feed multiple items and then send one more, all queued items are flushed together.
