Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 62 additions & 24 deletions src/proto/h2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ pin_project! {
{
body_tx: SendStream<SendBuf<S::Data>>,
data_done: bool,
// A data chunk that has been polled from the body but is still waiting
// for stream-level capacity before it can be shipped. Stored here so
// it survives across `Poll::Pending` returns from `poll_capacity`; if
// we left the chunk in a local, it would be dropped on every repoll.
buffered_data: Option<(S::Data, bool)>,
#[pin]
stream: S,
}
Expand All @@ -101,6 +106,7 @@ where
PipeToSendStream {
body_tx: tx,
data_done: false,
buffered_data: None,
stream,
}
}
Expand All @@ -121,13 +127,23 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut me = self.project();
loop {
// we don't have the next chunk of data yet, so just reserve 1 byte to make
// sure there's some capacity available. h2 will handle the capacity management
// for the actual body chunk.
me.body_tx.reserve_capacity(1);
// Register for RST_STREAM notification while we wait for the next
// body chunk or for send capacity, so the task wakes up if the
// peer resets the stream.
if let Poll::Ready(reason) = me
.body_tx
.poll_reset(cx)
.map_err(crate::Error::new_body_write)?
{
debug!("stream received RST_STREAM: {:?}", reason);
return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from(reason))));
}

if me.body_tx.capacity() == 0 {
loop {
// If a previously-polled chunk is still waiting for stream-level
// send capacity, drive that to completion before touching the
// body again.
if me.buffered_data.is_some() {
while me.body_tx.capacity() == 0 {
match ready!(me.body_tx.poll_capacity(cx)) {
Some(Ok(0)) => {}
Some(Ok(_)) => break,
Expand All @@ -142,34 +158,56 @@ where
}
}
}
} else if let Poll::Ready(reason) = me
.body_tx
.poll_reset(cx)
.map_err(crate::Error::new_body_write)?
{
debug!("stream received RST_STREAM: {:?}", reason);
return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from(reason))));

let (chunk, is_eos) = me.buffered_data.take().expect("checked is_some above");
let buf = SendBuf::Buf(chunk);
me.body_tx
.send_data(buf, is_eos)
.map_err(crate::Error::new_body_write)?;

if is_eos {
return Poll::Ready(Ok(()));
}
continue;
}

// Poll for the next body frame *before* reserving any connection
// flow-control capacity. Reserving capacity speculatively (even a
// single byte) pins that capacity on the connection-level window,
// which can deadlock a second stream when talking to peers that
// only emit WINDOW_UPDATE once their receive window is fully
// exhausted. See #4003.
match ready!(me.stream.as_mut().poll_frame(cx)) {
Some(Ok(frame)) => {
if frame.is_data() {
let chunk = frame.into_data().unwrap_or_else(|_| unreachable!());
let is_eos = me.stream.is_end_stream();
trace!(
"send body chunk: {} bytes, eos={}",
chunk.remaining(),
is_eos,
);
let len = chunk.remaining();
trace!("send body chunk: {} bytes, eos={}", len, is_eos);

let buf = SendBuf::Buf(chunk);
me.body_tx
.send_data(buf, is_eos)
.map_err(crate::Error::new_body_write)?;
if len == 0 {
// Zero-length data frames need no capacity; send
// them straight through so trailing empty frames
// (e.g. an explicit end-of-stream marker) are
// delivered.
let buf = SendBuf::Buf(chunk);
me.body_tx
.send_data(buf, is_eos)
.map_err(crate::Error::new_body_write)?;

if is_eos {
return Poll::Ready(Ok(()));
if is_eos {
return Poll::Ready(Ok(()));
}
continue;
}

// Reserve exactly the chunk size so we never pin more
// connection-level flow-control window than we are
// about to consume. Stash the chunk in `self` so it
// survives the upcoming `poll_capacity` wait even if
// it returns `Poll::Pending`.
me.body_tx.reserve_capacity(len);
*me.buffered_data = Some((chunk, is_eos));
} else if frame.is_trailers() {
// no more DATA, so give any capacity back
me.body_tx.reserve_capacity(0);
Expand Down
167 changes: 167 additions & 0 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1570,6 +1570,7 @@ mod conn {
use bytes::{Buf, Bytes};
use futures_channel::{mpsc, oneshot};
use futures_util::future::{self, poll_fn, FutureExt, TryFutureExt};
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Empty, Full, StreamBody};
use hyper::rt::Timer;
use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream};
Expand Down Expand Up @@ -2909,6 +2910,172 @@ mod conn {
let got_rst = rst_rx.await.expect("server task should complete");
assert!(got_rst, "server should receive RST_STREAM");
}

// https://github.com/hyperium/hyper/issues/4003
//
// An idle `PipeToSendStream` must not reserve any connection-level flow
// control capacity speculatively. If it does, a first stream that has
// filled the connection window will pin the remaining byte(s), and no
// second stream can make progress when talking to a peer that only emits
// `WINDOW_UPDATE` after its receive window is fully exhausted.
#[tokio::test]
async fn h2_idle_stream_does_not_pin_connection_window() {
use std::sync::{Arc, Mutex};

// The HTTP/2 spec fixes the initial connection-level window at 65535
// (RFC 9113 section 6.9.2), and it can only be increased via
// WINDOW_UPDATE. Stream A therefore sends 65534 bytes to leave exactly
// one byte of connection window for stream B.
const STREAM_A_LEN: usize = 65534;

let (client_io, server_io, _) = setup_duplex_test_server();
let (stream_a_full_tx, stream_a_full_rx) = oneshot::channel::<()>();
let (stream_b_got_tx, stream_b_got_rx) = oneshot::channel::<usize>();

// Raw h2 server that never calls `release_capacity`, so no
// connection-level WINDOW_UPDATE is ever sent — mimicking peers that
// only emit WINDOW_UPDATE after their receive window is fully
// exhausted. The main server task accepts streams in a loop so the
// h2 codec is driven continuously; each stream is dispatched to a
// spawned handler that reads the body without ever releasing
// capacity.
//
// The `stream_a_done` channel keeps stream A's server-side request
// alive until the test is done. Dropping the recv side of stream A
// would let h2 auto-release its in-flight recv capacity and emit a
// WINDOW_UPDATE, which would hide the bug.
let (stream_a_done_tx, stream_a_done_rx) = oneshot::channel::<()>();
let stream_a_full_tx = Arc::new(Mutex::new(Some(stream_a_full_tx)));
let stream_b_got_tx = Arc::new(Mutex::new(Some(stream_b_got_tx)));
let stream_a_done_rx = Arc::new(Mutex::new(Some(stream_a_done_rx)));
tokio::spawn(async move {
let mut h2 = h2::server::handshake(server_io).await.unwrap();
let mut seen = 0u32;
while let Some(result) = h2.accept().await {
let (req, mut respond) = result.unwrap();
seen += 1;
let which = seen;
let stream_a_full_tx = stream_a_full_tx.clone();
let stream_b_got_tx = stream_b_got_tx.clone();
let stream_a_done_rx = stream_a_done_rx.clone();
tokio::spawn(async move {
let mut body = req.into_body();
if which == 1 {
// Stream A: drain the burst of body data without ever
// releasing recv capacity, then park on the done
// channel to hold on to the recv stream.
let mut received = 0usize;
while received < STREAM_A_LEN {
let frame = match body.data().await {
Some(Ok(f)) => f,
_ => return,
};
received += frame.len();
// Intentionally do NOT call release_capacity.
}
if let Some(tx) = stream_a_full_tx.lock().unwrap().take() {
let _ = tx.send(());
}
// Keep the recv stream alive so that dropping it
// cannot auto-release connection-level recv capacity
// and emit a WINDOW_UPDATE mid-test.
let done = stream_a_done_rx.lock().unwrap().take();
if let Some(done) = done {
let _ = done.await;
}
// Keep `body` in scope until here.
drop(body);
} else {
// Stream B: record the first data frame and respond.
let mut received = 0usize;
if let Some(Ok(frame)) = body.data().await {
received += frame.len();
}
if let Some(tx) = stream_b_got_tx.lock().unwrap().take() {
let _ = tx.send(received);
}
let mut send = respond.send_response(Response::new(()), false).unwrap();
let _ = send.send_data(Bytes::from_static(b"ok"), true);
}
});
}
});

let io = TokioIo::new(client_io);
let (mut client, conn) = conn::http2::Builder::new(TokioExecutor)
.handshake::<_, BoxBody<Bytes, Box<dyn Error + Send + Sync>>>(io)
.await
.expect("http handshake");
tokio::spawn(async move {
let _ = conn.await;
});

// Request A: streaming body that sends STREAM_A_LEN bytes and then
// stays open, waiting for more data. This fills the advertised
// connection-level window down to one byte remaining.
let (mut tx_a, rx_a) =
mpsc::channel::<Result<Frame<Bytes>, Box<dyn Error + Send + Sync>>>(4);
let body_a: BoxBody<Bytes, Box<dyn Error + Send + Sync>> =
BodyExt::boxed(StreamBody::new(rx_a));
let req_a = Request::post("http://localhost/a").body(body_a).unwrap();
let mut client_a = client.clone();
let a_handle = tokio::spawn(async move { client_a.send_request(req_a).await });

// Push stream A's body in 16 KiB chunks to match the default h2
// `SETTINGS_MAX_FRAME_SIZE`.
use futures_util::SinkExt;
let mut remaining = STREAM_A_LEN;
while remaining > 0 {
let take = remaining.min(16_384);
let bytes = Bytes::from(vec![b'A'; take]);
tx_a.send(Ok(Frame::data(bytes)))
.await
.expect("stream A channel send");
remaining -= take;
}

// Wait for the server to confirm it received the full body on stream
// A, which means the connection window is now down to its last byte.
tokio::time::timeout(Duration::from_secs(5), stream_a_full_rx)
.await
.expect("server should receive full stream A body in time")
.expect("stream_a_full_rx");

// Give the client's `PipeToSendStream` for stream A a moment to park
// itself waiting for more body frames, which (with the bug) would
// speculatively reserve the last byte of connection-level capacity.
for _ in 0..16 {
tokio::task::yield_now().await;
}

// Request B: one byte of body. With the bug in `PipeToSendStream`,
// stream A pins the last byte of connection window via a speculative
// reserve, so stream B can never ship its data frame.
let body_b: BoxBody<Bytes, Box<dyn Error + Send + Sync>> = BodyExt::boxed(
http_body_util::Full::new(Bytes::from_static(b"b"))
.map_err(|never: std::convert::Infallible| match never {}),
);
let req_b = Request::post("http://localhost/b").body(body_b).unwrap();
let b_fut = client.send_request(req_b);

let received_b = tokio::time::timeout(Duration::from_secs(5), stream_b_got_rx)
.await
.expect("stream B must reach the server even while stream A is idle")
.expect("stream_b_got_rx");
assert_eq!(
received_b, 1,
"stream B should deliver its single body byte"
);

// Drive request B to completion so we don't leak the future.
let _ = tokio::time::timeout(Duration::from_secs(5), b_fut).await;

// Close stream A cleanly: first release the server-side handler so
// it drops the recv stream, then drop the body sender.
let _ = stream_a_done_tx.send(());
drop(tx_a);
let _ = tokio::time::timeout(Duration::from_secs(5), a_handle).await;
}
}

trait FutureHyperExt: TryFuture {
Expand Down