diff --git a/src/proto/h2/mod.rs b/src/proto/h2/mod.rs index 9d4826152e..5dc11a63cc 100644 --- a/src/proto/h2/mod.rs +++ b/src/proto/h2/mod.rs @@ -88,6 +88,11 @@ pin_project! { { body_tx: SendStream>, data_done: bool, + // A data chunk that has been polled from the body but is still waiting + // for stream-level capacity before it can be shipped. Stored here so + // it survives across `Poll::Pending` returns from `poll_capacity`; if + // we left the chunk in a local, it would be dropped on every repoll. + buffered_data: Option<(S::Data, bool)>, #[pin] stream: S, } @@ -101,6 +106,7 @@ where PipeToSendStream { body_tx: tx, data_done: false, + buffered_data: None, stream, } } @@ -121,13 +127,23 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut me = self.project(); loop { - // we don't have the next chunk of data yet, so just reserve 1 byte to make - // sure there's some capacity available. h2 will handle the capacity management - // for the actual body chunk. - me.body_tx.reserve_capacity(1); + // Register for RST_STREAM notification while we wait for the next + // body chunk or for send capacity, so the task wakes up if the + // peer resets the stream. + if let Poll::Ready(reason) = me + .body_tx + .poll_reset(cx) + .map_err(crate::Error::new_body_write)? + { + debug!("stream received RST_STREAM: {:?}", reason); + return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from(reason)))); + } - if me.body_tx.capacity() == 0 { - loop { + // If a previously-polled chunk is still waiting for stream-level + // send capacity, drive that to completion before touching the + // body again. + if me.buffered_data.is_some() { + while me.body_tx.capacity() == 0 { match ready!(me.body_tx.poll_capacity(cx)) { Some(Ok(0)) => {} Some(Ok(_)) => break, @@ -142,34 +158,56 @@ where } } } - } else if let Poll::Ready(reason) = me - .body_tx - .poll_reset(cx) - .map_err(crate::Error::new_body_write)? - { - debug!("stream received RST_STREAM: {:?}", reason); - return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from(reason)))); + + let (chunk, is_eos) = me.buffered_data.take().expect("checked is_some above"); + let buf = SendBuf::Buf(chunk); + me.body_tx + .send_data(buf, is_eos) + .map_err(crate::Error::new_body_write)?; + + if is_eos { + return Poll::Ready(Ok(())); + } + continue; } + // Poll for the next body frame *before* reserving any connection + // flow-control capacity. Reserving capacity speculatively (even a + // single byte) pins that capacity on the connection-level window, + // which can deadlock a second stream when talking to peers that + // only emit WINDOW_UPDATE once their receive window is fully + // exhausted. See #4003. match ready!(me.stream.as_mut().poll_frame(cx)) { Some(Ok(frame)) => { if frame.is_data() { let chunk = frame.into_data().unwrap_or_else(|_| unreachable!()); let is_eos = me.stream.is_end_stream(); - trace!( - "send body chunk: {} bytes, eos={}", - chunk.remaining(), - is_eos, - ); + let len = chunk.remaining(); + trace!("send body chunk: {} bytes, eos={}", len, is_eos); - let buf = SendBuf::Buf(chunk); - me.body_tx - .send_data(buf, is_eos) - .map_err(crate::Error::new_body_write)?; + if len == 0 { + // Zero-length data frames need no capacity; send + // them straight through so trailing empty frames + // (e.g. an explicit end-of-stream marker) are + // delivered. + let buf = SendBuf::Buf(chunk); + me.body_tx + .send_data(buf, is_eos) + .map_err(crate::Error::new_body_write)?; - if is_eos { - return Poll::Ready(Ok(())); + if is_eos { + return Poll::Ready(Ok(())); + } + continue; } + + // Reserve exactly the chunk size so we never pin more + // connection-level flow-control window than we are + // about to consume. Stash the chunk in `self` so it + // survives the upcoming `poll_capacity` wait even if + // it returns `Poll::Pending`. + me.body_tx.reserve_capacity(len); + *me.buffered_data = Some((chunk, is_eos)); } else if frame.is_trailers() { // no more DATA, so give any capacity back me.body_tx.reserve_capacity(0); diff --git a/tests/client.rs b/tests/client.rs index e80459327a..fce22987c2 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1570,6 +1570,7 @@ mod conn { use bytes::{Buf, Bytes}; use futures_channel::{mpsc, oneshot}; use futures_util::future::{self, poll_fn, FutureExt, TryFutureExt}; + use http_body_util::combinators::BoxBody; use http_body_util::{BodyExt, Empty, Full, StreamBody}; use hyper::rt::Timer; use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream}; @@ -2909,6 +2910,172 @@ mod conn { let got_rst = rst_rx.await.expect("server task should complete"); assert!(got_rst, "server should receive RST_STREAM"); } + + // https://github.com/hyperium/hyper/issues/4003 + // + // An idle `PipeToSendStream` must not reserve any connection-level flow + // control capacity speculatively. If it does, a first stream that has + // filled the connection window will pin the remaining byte(s), and no + // second stream can make progress when talking to a peer that only emits + // `WINDOW_UPDATE` after its receive window is fully exhausted. + #[tokio::test] + async fn h2_idle_stream_does_not_pin_connection_window() { + use std::sync::{Arc, Mutex}; + + // The HTTP/2 spec fixes the initial connection-level window at 65535 + // (RFC 9113 section 6.9.2), and it can only be increased via + // WINDOW_UPDATE. Stream A therefore sends 65534 bytes to leave exactly + // one byte of connection window for stream B. + const STREAM_A_LEN: usize = 65534; + + let (client_io, server_io, _) = setup_duplex_test_server(); + let (stream_a_full_tx, stream_a_full_rx) = oneshot::channel::<()>(); + let (stream_b_got_tx, stream_b_got_rx) = oneshot::channel::(); + + // Raw h2 server that never calls `release_capacity`, so no + // connection-level WINDOW_UPDATE is ever sent — mimicking peers that + // only emit WINDOW_UPDATE after their receive window is fully + // exhausted. The main server task accepts streams in a loop so the + // h2 codec is driven continuously; each stream is dispatched to a + // spawned handler that reads the body without ever releasing + // capacity. + // + // The `stream_a_done` channel keeps stream A's server-side request + // alive until the test is done. Dropping the recv side of stream A + // would let h2 auto-release its in-flight recv capacity and emit a + // WINDOW_UPDATE, which would hide the bug. + let (stream_a_done_tx, stream_a_done_rx) = oneshot::channel::<()>(); + let stream_a_full_tx = Arc::new(Mutex::new(Some(stream_a_full_tx))); + let stream_b_got_tx = Arc::new(Mutex::new(Some(stream_b_got_tx))); + let stream_a_done_rx = Arc::new(Mutex::new(Some(stream_a_done_rx))); + tokio::spawn(async move { + let mut h2 = h2::server::handshake(server_io).await.unwrap(); + let mut seen = 0u32; + while let Some(result) = h2.accept().await { + let (req, mut respond) = result.unwrap(); + seen += 1; + let which = seen; + let stream_a_full_tx = stream_a_full_tx.clone(); + let stream_b_got_tx = stream_b_got_tx.clone(); + let stream_a_done_rx = stream_a_done_rx.clone(); + tokio::spawn(async move { + let mut body = req.into_body(); + if which == 1 { + // Stream A: drain the burst of body data without ever + // releasing recv capacity, then park on the done + // channel to hold on to the recv stream. + let mut received = 0usize; + while received < STREAM_A_LEN { + let frame = match body.data().await { + Some(Ok(f)) => f, + _ => return, + }; + received += frame.len(); + // Intentionally do NOT call release_capacity. + } + if let Some(tx) = stream_a_full_tx.lock().unwrap().take() { + let _ = tx.send(()); + } + // Keep the recv stream alive so that dropping it + // cannot auto-release connection-level recv capacity + // and emit a WINDOW_UPDATE mid-test. + let done = stream_a_done_rx.lock().unwrap().take(); + if let Some(done) = done { + let _ = done.await; + } + // Keep `body` in scope until here. + drop(body); + } else { + // Stream B: record the first data frame and respond. + let mut received = 0usize; + if let Some(Ok(frame)) = body.data().await { + received += frame.len(); + } + if let Some(tx) = stream_b_got_tx.lock().unwrap().take() { + let _ = tx.send(received); + } + let mut send = respond.send_response(Response::new(()), false).unwrap(); + let _ = send.send_data(Bytes::from_static(b"ok"), true); + } + }); + } + }); + + let io = TokioIo::new(client_io); + let (mut client, conn) = conn::http2::Builder::new(TokioExecutor) + .handshake::<_, BoxBody>>(io) + .await + .expect("http handshake"); + tokio::spawn(async move { + let _ = conn.await; + }); + + // Request A: streaming body that sends STREAM_A_LEN bytes and then + // stays open, waiting for more data. This fills the advertised + // connection-level window down to one byte remaining. + let (mut tx_a, rx_a) = + mpsc::channel::, Box>>(4); + let body_a: BoxBody> = + BodyExt::boxed(StreamBody::new(rx_a)); + let req_a = Request::post("http://localhost/a").body(body_a).unwrap(); + let mut client_a = client.clone(); + let a_handle = tokio::spawn(async move { client_a.send_request(req_a).await }); + + // Push stream A's body in 16 KiB chunks to match the default h2 + // `SETTINGS_MAX_FRAME_SIZE`. + use futures_util::SinkExt; + let mut remaining = STREAM_A_LEN; + while remaining > 0 { + let take = remaining.min(16_384); + let bytes = Bytes::from(vec![b'A'; take]); + tx_a.send(Ok(Frame::data(bytes))) + .await + .expect("stream A channel send"); + remaining -= take; + } + + // Wait for the server to confirm it received the full body on stream + // A, which means the connection window is now down to its last byte. + tokio::time::timeout(Duration::from_secs(5), stream_a_full_rx) + .await + .expect("server should receive full stream A body in time") + .expect("stream_a_full_rx"); + + // Give the client's `PipeToSendStream` for stream A a moment to park + // itself waiting for more body frames, which (with the bug) would + // speculatively reserve the last byte of connection-level capacity. + for _ in 0..16 { + tokio::task::yield_now().await; + } + + // Request B: one byte of body. With the bug in `PipeToSendStream`, + // stream A pins the last byte of connection window via a speculative + // reserve, so stream B can never ship its data frame. + let body_b: BoxBody> = BodyExt::boxed( + http_body_util::Full::new(Bytes::from_static(b"b")) + .map_err(|never: std::convert::Infallible| match never {}), + ); + let req_b = Request::post("http://localhost/b").body(body_b).unwrap(); + let b_fut = client.send_request(req_b); + + let received_b = tokio::time::timeout(Duration::from_secs(5), stream_b_got_rx) + .await + .expect("stream B must reach the server even while stream A is idle") + .expect("stream_b_got_rx"); + assert_eq!( + received_b, 1, + "stream B should deliver its single body byte" + ); + + // Drive request B to completion so we don't leak the future. + let _ = tokio::time::timeout(Duration::from_secs(5), b_fut).await; + + // Close stream A cleanly: first release the server-side handler so + // it drops the recv stream, then drop the body sender. + let _ = stream_a_done_tx.send(()); + drop(tx_a); + let _ = tokio::time::timeout(Duration::from_secs(5), a_handle).await; + } } trait FutureHyperExt: TryFuture {