From 29f719faf2f35910c1622db7f787b16f227d5245 Mon Sep 17 00:00:00 2001 From: barry3406 Date: Fri, 10 Apr 2026 10:19:28 -0700 Subject: [PATCH] fix(http2): do not reserve capacity before body data is available PipeToSendStream used to call `reserve_capacity(1)` at the top of every poll iteration as a probe, before asking the body for the next chunk. The reservation is immediately assigned from the connection-level flow-control window, and while the stream is parked waiting for more body data the reservation keeps the last byte of the connection window pinned to the stream. Against peers that only emit a WINDOW_UPDATE once their receive window is fully exhausted (for example Bun's built-in HTTP/2 server, as well as other implementations with a similar strategy), this one-byte reservation is enough to deadlock a second concurrent stream: the connection window never drops to zero on the peer, so no WINDOW_UPDATE is ever sent, and the second stream can never get any capacity. Restructure the loop so it polls the body for the next frame first and only reserves capacity equal to the chunk's exact size. The polled chunk is stashed in a new `buffered_data` field so it survives the `poll_capacity` wait across `Poll::Pending` returns without being dropped. Zero-length data frames are forwarded immediately without touching the reservation. `poll_reset` is now registered at the top of every iteration so RST_STREAM still wakes the task while it waits for either more body data or more capacity. Add a regression test that pairs a streaming request filling the connection window with a second one-byte request, talking to a raw h2::server that never releases recv capacity, and asserts the second request reaches the server. Closes #4003 --- src/proto/h2/mod.rs | 86 ++++++++++++++++------- tests/client.rs | 167 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 229 insertions(+), 24 deletions(-) diff --git a/src/proto/h2/mod.rs b/src/proto/h2/mod.rs index 9d4826152e..5dc11a63cc 100644 --- a/src/proto/h2/mod.rs +++ b/src/proto/h2/mod.rs @@ -88,6 +88,11 @@ pin_project! { { body_tx: SendStream>, data_done: bool, + // A data chunk that has been polled from the body but is still waiting + // for stream-level capacity before it can be shipped. Stored here so + // it survives across `Poll::Pending` returns from `poll_capacity`; if + // we left the chunk in a local, it would be dropped on every repoll. + buffered_data: Option<(S::Data, bool)>, #[pin] stream: S, } @@ -101,6 +106,7 @@ where PipeToSendStream { body_tx: tx, data_done: false, + buffered_data: None, stream, } } @@ -121,13 +127,23 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut me = self.project(); loop { - // we don't have the next chunk of data yet, so just reserve 1 byte to make - // sure there's some capacity available. h2 will handle the capacity management - // for the actual body chunk. - me.body_tx.reserve_capacity(1); + // Register for RST_STREAM notification while we wait for the next + // body chunk or for send capacity, so the task wakes up if the + // peer resets the stream. + if let Poll::Ready(reason) = me + .body_tx + .poll_reset(cx) + .map_err(crate::Error::new_body_write)? + { + debug!("stream received RST_STREAM: {:?}", reason); + return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from(reason)))); + } - if me.body_tx.capacity() == 0 { - loop { + // If a previously-polled chunk is still waiting for stream-level + // send capacity, drive that to completion before touching the + // body again. + if me.buffered_data.is_some() { + while me.body_tx.capacity() == 0 { match ready!(me.body_tx.poll_capacity(cx)) { Some(Ok(0)) => {} Some(Ok(_)) => break, @@ -142,34 +158,56 @@ where } } } - } else if let Poll::Ready(reason) = me - .body_tx - .poll_reset(cx) - .map_err(crate::Error::new_body_write)? - { - debug!("stream received RST_STREAM: {:?}", reason); - return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from(reason)))); + + let (chunk, is_eos) = me.buffered_data.take().expect("checked is_some above"); + let buf = SendBuf::Buf(chunk); + me.body_tx + .send_data(buf, is_eos) + .map_err(crate::Error::new_body_write)?; + + if is_eos { + return Poll::Ready(Ok(())); + } + continue; } + // Poll for the next body frame *before* reserving any connection + // flow-control capacity. Reserving capacity speculatively (even a + // single byte) pins that capacity on the connection-level window, + // which can deadlock a second stream when talking to peers that + // only emit WINDOW_UPDATE once their receive window is fully + // exhausted. See #4003. match ready!(me.stream.as_mut().poll_frame(cx)) { Some(Ok(frame)) => { if frame.is_data() { let chunk = frame.into_data().unwrap_or_else(|_| unreachable!()); let is_eos = me.stream.is_end_stream(); - trace!( - "send body chunk: {} bytes, eos={}", - chunk.remaining(), - is_eos, - ); + let len = chunk.remaining(); + trace!("send body chunk: {} bytes, eos={}", len, is_eos); - let buf = SendBuf::Buf(chunk); - me.body_tx - .send_data(buf, is_eos) - .map_err(crate::Error::new_body_write)?; + if len == 0 { + // Zero-length data frames need no capacity; send + // them straight through so trailing empty frames + // (e.g. an explicit end-of-stream marker) are + // delivered. + let buf = SendBuf::Buf(chunk); + me.body_tx + .send_data(buf, is_eos) + .map_err(crate::Error::new_body_write)?; - if is_eos { - return Poll::Ready(Ok(())); + if is_eos { + return Poll::Ready(Ok(())); + } + continue; } + + // Reserve exactly the chunk size so we never pin more + // connection-level flow-control window than we are + // about to consume. Stash the chunk in `self` so it + // survives the upcoming `poll_capacity` wait even if + // it returns `Poll::Pending`. + me.body_tx.reserve_capacity(len); + *me.buffered_data = Some((chunk, is_eos)); } else if frame.is_trailers() { // no more DATA, so give any capacity back me.body_tx.reserve_capacity(0); diff --git a/tests/client.rs b/tests/client.rs index e80459327a..fce22987c2 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1570,6 +1570,7 @@ mod conn { use bytes::{Buf, Bytes}; use futures_channel::{mpsc, oneshot}; use futures_util::future::{self, poll_fn, FutureExt, TryFutureExt}; + use http_body_util::combinators::BoxBody; use http_body_util::{BodyExt, Empty, Full, StreamBody}; use hyper::rt::Timer; use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream}; @@ -2909,6 +2910,172 @@ mod conn { let got_rst = rst_rx.await.expect("server task should complete"); assert!(got_rst, "server should receive RST_STREAM"); } + + // https://github.com/hyperium/hyper/issues/4003 + // + // An idle `PipeToSendStream` must not reserve any connection-level flow + // control capacity speculatively. If it does, a first stream that has + // filled the connection window will pin the remaining byte(s), and no + // second stream can make progress when talking to a peer that only emits + // `WINDOW_UPDATE` after its receive window is fully exhausted. + #[tokio::test] + async fn h2_idle_stream_does_not_pin_connection_window() { + use std::sync::{Arc, Mutex}; + + // The HTTP/2 spec fixes the initial connection-level window at 65535 + // (RFC 9113 section 6.9.2), and it can only be increased via + // WINDOW_UPDATE. Stream A therefore sends 65534 bytes to leave exactly + // one byte of connection window for stream B. + const STREAM_A_LEN: usize = 65534; + + let (client_io, server_io, _) = setup_duplex_test_server(); + let (stream_a_full_tx, stream_a_full_rx) = oneshot::channel::<()>(); + let (stream_b_got_tx, stream_b_got_rx) = oneshot::channel::(); + + // Raw h2 server that never calls `release_capacity`, so no + // connection-level WINDOW_UPDATE is ever sent — mimicking peers that + // only emit WINDOW_UPDATE after their receive window is fully + // exhausted. The main server task accepts streams in a loop so the + // h2 codec is driven continuously; each stream is dispatched to a + // spawned handler that reads the body without ever releasing + // capacity. + // + // The `stream_a_done` channel keeps stream A's server-side request + // alive until the test is done. Dropping the recv side of stream A + // would let h2 auto-release its in-flight recv capacity and emit a + // WINDOW_UPDATE, which would hide the bug. + let (stream_a_done_tx, stream_a_done_rx) = oneshot::channel::<()>(); + let stream_a_full_tx = Arc::new(Mutex::new(Some(stream_a_full_tx))); + let stream_b_got_tx = Arc::new(Mutex::new(Some(stream_b_got_tx))); + let stream_a_done_rx = Arc::new(Mutex::new(Some(stream_a_done_rx))); + tokio::spawn(async move { + let mut h2 = h2::server::handshake(server_io).await.unwrap(); + let mut seen = 0u32; + while let Some(result) = h2.accept().await { + let (req, mut respond) = result.unwrap(); + seen += 1; + let which = seen; + let stream_a_full_tx = stream_a_full_tx.clone(); + let stream_b_got_tx = stream_b_got_tx.clone(); + let stream_a_done_rx = stream_a_done_rx.clone(); + tokio::spawn(async move { + let mut body = req.into_body(); + if which == 1 { + // Stream A: drain the burst of body data without ever + // releasing recv capacity, then park on the done + // channel to hold on to the recv stream. + let mut received = 0usize; + while received < STREAM_A_LEN { + let frame = match body.data().await { + Some(Ok(f)) => f, + _ => return, + }; + received += frame.len(); + // Intentionally do NOT call release_capacity. + } + if let Some(tx) = stream_a_full_tx.lock().unwrap().take() { + let _ = tx.send(()); + } + // Keep the recv stream alive so that dropping it + // cannot auto-release connection-level recv capacity + // and emit a WINDOW_UPDATE mid-test. + let done = stream_a_done_rx.lock().unwrap().take(); + if let Some(done) = done { + let _ = done.await; + } + // Keep `body` in scope until here. + drop(body); + } else { + // Stream B: record the first data frame and respond. + let mut received = 0usize; + if let Some(Ok(frame)) = body.data().await { + received += frame.len(); + } + if let Some(tx) = stream_b_got_tx.lock().unwrap().take() { + let _ = tx.send(received); + } + let mut send = respond.send_response(Response::new(()), false).unwrap(); + let _ = send.send_data(Bytes::from_static(b"ok"), true); + } + }); + } + }); + + let io = TokioIo::new(client_io); + let (mut client, conn) = conn::http2::Builder::new(TokioExecutor) + .handshake::<_, BoxBody>>(io) + .await + .expect("http handshake"); + tokio::spawn(async move { + let _ = conn.await; + }); + + // Request A: streaming body that sends STREAM_A_LEN bytes and then + // stays open, waiting for more data. This fills the advertised + // connection-level window down to one byte remaining. + let (mut tx_a, rx_a) = + mpsc::channel::, Box>>(4); + let body_a: BoxBody> = + BodyExt::boxed(StreamBody::new(rx_a)); + let req_a = Request::post("http://localhost/a").body(body_a).unwrap(); + let mut client_a = client.clone(); + let a_handle = tokio::spawn(async move { client_a.send_request(req_a).await }); + + // Push stream A's body in 16 KiB chunks to match the default h2 + // `SETTINGS_MAX_FRAME_SIZE`. + use futures_util::SinkExt; + let mut remaining = STREAM_A_LEN; + while remaining > 0 { + let take = remaining.min(16_384); + let bytes = Bytes::from(vec![b'A'; take]); + tx_a.send(Ok(Frame::data(bytes))) + .await + .expect("stream A channel send"); + remaining -= take; + } + + // Wait for the server to confirm it received the full body on stream + // A, which means the connection window is now down to its last byte. + tokio::time::timeout(Duration::from_secs(5), stream_a_full_rx) + .await + .expect("server should receive full stream A body in time") + .expect("stream_a_full_rx"); + + // Give the client's `PipeToSendStream` for stream A a moment to park + // itself waiting for more body frames, which (with the bug) would + // speculatively reserve the last byte of connection-level capacity. + for _ in 0..16 { + tokio::task::yield_now().await; + } + + // Request B: one byte of body. With the bug in `PipeToSendStream`, + // stream A pins the last byte of connection window via a speculative + // reserve, so stream B can never ship its data frame. + let body_b: BoxBody> = BodyExt::boxed( + http_body_util::Full::new(Bytes::from_static(b"b")) + .map_err(|never: std::convert::Infallible| match never {}), + ); + let req_b = Request::post("http://localhost/b").body(body_b).unwrap(); + let b_fut = client.send_request(req_b); + + let received_b = tokio::time::timeout(Duration::from_secs(5), stream_b_got_rx) + .await + .expect("stream B must reach the server even while stream A is idle") + .expect("stream_b_got_rx"); + assert_eq!( + received_b, 1, + "stream B should deliver its single body byte" + ); + + // Drive request B to completion so we don't leak the future. + let _ = tokio::time::timeout(Duration::from_secs(5), b_fut).await; + + // Close stream A cleanly: first release the server-side handler so + // it drops the recv stream, then drop the body sender. + let _ = stream_a_done_tx.send(()); + drop(tx_a); + let _ = tokio::time::timeout(Duration::from_secs(5), a_handle).await; + } } trait FutureHyperExt: TryFuture {