diff --git a/lib/wreq_ruby/response.rb b/lib/wreq_ruby/response.rb index d06cbc6..d1194a1 100644 --- a/lib/wreq_ruby/response.rb +++ b/lib/wreq_ruby/response.rb @@ -110,14 +110,17 @@ def text(default_encoding: "UTF-8") def json end - # Get a streaming iterator for the response body, yielding each chunk. + # Stream the response body, yielding each chunk to the given block. # # This method allows you to process large HTTP responses efficiently, # by yielding each chunk of the body as it arrives, without loading # the entire response into memory. # - # @return An iterator over response body chunks (binary String) + # @return [nil] # @yield [chunk] Each chunk of the response body as a binary String + # @raise [LocalJumpError] if called without a block + # @raise [Wreq::TimeoutError, Wreq::BodyError, Wreq::ConnectionResetError, Wreq::RequestError] + # if streaming fails while reading the response body # @example Save response to file # File.open("output.bin", "wb") do |f| # response.chunks { |chunk| f.write(chunk) } @@ -127,7 +130,7 @@ def json # response.chunks { |chunk| total += chunk.bytesize } # puts "Downloaded #{total} bytes" # - # Note: The returned Receiver is only for reading response bodies, not for uploads. + # Exceptions raised inside the block are propagated to the caller. def chunks end diff --git a/src/client/body/stream.rs b/src/client/body/stream.rs index dbf13b4..d34e8c4 100644 --- a/src/client/body/stream.rs +++ b/src/client/body/stream.rs @@ -13,7 +13,7 @@ use tokio::sync::{ }; use crate::{ - error::{memory_error, mpsc_send_error_to_magnus}, + error::{memory_error, mpsc_send_error_to_magnus, wreq_error_to_magnus}, rt, }; @@ -37,20 +37,15 @@ impl BodyReceiver { pub fn new(stream: impl Stream> + Send + 'static) -> BodyReceiver { BodyReceiver(Mutex::new(Box::pin(stream))) } -} -impl Iterator for BodyReceiver { - type Item = Bytes; - - fn next(&mut self) -> Option { - rt::maybe_block_on(async { - self.0 - .lock() - .await - .as_mut() - .next() - .await - .and_then(|r| r.ok()) + /// Read the next body chunk, converting stream errors into Ruby errors. + pub fn next(&self) -> Result, Error> { + rt::try_block_on(async { + match self.0.lock().await.as_mut().next().await { + Some(Ok(data)) => Ok(Some(data)), + Some(Err(err)) => Err(wreq_error_to_magnus(err)), + None => Ok(None), + } }) } } diff --git a/src/client/resp.rs b/src/client/resp.rs index 1aeab97..ce35057 100644 --- a/src/client/resp.rs +++ b/src/client/resp.rs @@ -5,14 +5,14 @@ use bytes::Bytes; use futures_util::TryFutureExt; use http::{Extensions, HeaderMap, response::Response as HttpResponse}; use http_body_util::BodyExt; -use magnus::{Error, Module, RArray, RModule, Ruby, Value, block::Yield, scan_args::scan_args}; +use magnus::{Error, Module, RArray, RModule, Ruby, Value, scan_args::scan_args}; use wreq::Uri; use crate::{ client::body::{BodyReceiver, Json}, cookie::Cookie, - error::{memory_error, wreq_error_to_magnus}, - gvl, + error::{memory_error, no_block_given_error, wreq_error_to_magnus}, + gvl::{self, nogvl}, header::Headers, http::{StatusCode, Version}, rt, @@ -199,12 +199,24 @@ impl Response { }) } - /// Get a chunk iterator for the response body. - pub fn chunks(&self) -> Result, Error> { - self.response(true) - .map(wreq::Response::bytes_stream) - .map(BodyReceiver::new) - .map(Yield::Iter) + /// Yield response body chunks to the given Ruby block. + pub fn chunks(ruby: &Ruby, rb_self: &Self) -> Result<(), Error> { + if !ruby.block_given() { + return Err(no_block_given_error()); + } + + let receiver = nogvl(|| { + rb_self + .response(true) + .map(wreq::Response::bytes_stream) + .map(BodyReceiver::new) + })?; + + while let Some(chunk) = receiver.next()? { + let _: Value = ruby.yield_value(chunk)?; + } + + Ok(()) } /// Close the response body, dropping any resources. @@ -222,23 +234,23 @@ impl Drop for Response { } pub fn include(ruby: &Ruby, gem_module: &RModule) -> Result<(), Error> { - let response_class = gem_module.define_class("Response", ruby.class_object())?; - response_class.define_method("code", magnus::method!(Response::code, 0))?; - response_class.define_method("status", magnus::method!(Response::status, 0))?; - response_class.define_method("version", magnus::method!(Response::version, 0))?; - response_class.define_method("url", magnus::method!(Response::url, 0))?; - response_class.define_method( + let response = gem_module.define_class("Response", ruby.class_object())?; + response.define_method("code", magnus::method!(Response::code, 0))?; + response.define_method("status", magnus::method!(Response::status, 0))?; + response.define_method("version", magnus::method!(Response::version, 0))?; + response.define_method("url", magnus::method!(Response::url, 0))?; + response.define_method( "content_length", magnus::method!(Response::content_length, 0), )?; - response_class.define_method("cookies", magnus::method!(Response::cookies, 0))?; - response_class.define_method("headers", magnus::method!(Response::headers, 0))?; - response_class.define_method("local_addr", magnus::method!(Response::local_addr, 0))?; - response_class.define_method("remote_addr", magnus::method!(Response::remote_addr, 0))?; - response_class.define_method("bytes", magnus::method!(Response::bytes, 0))?; - response_class.define_method("text", magnus::method!(Response::text, -1))?; - response_class.define_method("json", magnus::method!(Response::json, 0))?; - response_class.define_method("chunks", magnus::method!(Response::chunks, 0))?; - response_class.define_method("close", magnus::method!(Response::close, 0))?; + response.define_method("cookies", magnus::method!(Response::cookies, 0))?; + response.define_method("headers", magnus::method!(Response::headers, 0))?; + response.define_method("local_addr", magnus::method!(Response::local_addr, 0))?; + response.define_method("remote_addr", magnus::method!(Response::remote_addr, 0))?; + response.define_method("bytes", magnus::method!(Response::bytes, 0))?; + response.define_method("text", magnus::method!(Response::text, -1))?; + response.define_method("json", magnus::method!(Response::json, 0))?; + response.define_method("chunks", magnus::method!(Response::chunks, 0))?; + response.define_method("close", magnus::method!(Response::close, 0))?; Ok(()) } diff --git a/src/error.rs b/src/error.rs index edb6932..6cf9521 100644 --- a/src/error.rs +++ b/src/error.rs @@ -85,6 +85,14 @@ pub fn interrupt_error() -> MagnusError { MagnusError::new(ruby!().get_inner(&INTERRUPT_ERROR), "request interrupted") } +/// LocalJumpError for methods that require a Ruby block. +pub fn no_block_given_error() -> MagnusError { + MagnusError::new( + ruby!().exception_local_jump_error(), + "no block given (yield)", + ) +} + /// Map [`tokio::sync::mpsc::error::SendError`] to corresponding [`magnus::Error`] pub fn mpsc_send_error_to_magnus(err: SendError) -> MagnusError { MagnusError::new( diff --git a/src/rt.rs b/src/rt.rs index 1bd71f5..ebedd6f 100644 --- a/src/rt.rs +++ b/src/rt.rs @@ -27,21 +27,3 @@ where }) }) } - -/// Block on a future to completion on the global Tokio runtime, -/// returning `None` if cancelled via the provided `CancelFlag`. -#[inline] -pub fn maybe_block_on(future: F) -> F::Output -where - F: Future>, -{ - gvl::nogvl_cancellable(|flag| { - RUNTIME.block_on(async move { - tokio::select! { - biased; - _ = flag.cancelled() => None, - result = future => result, - } - }) - }) -} diff --git a/test/gvl_stream_test.rb b/test/gvl_stream_test.rb new file mode 100644 index 0000000..fbb429a --- /dev/null +++ b/test/gvl_stream_test.rb @@ -0,0 +1,330 @@ +# frozen_string_literal: true + +require "test_helper" + +class GvlStreamingTest < Minitest::Test + # ========================================================================= + # Basic streaming functionality + # ========================================================================= + + def test_chunks_yields_string_chunks + client = Wreq::Client.new + resp = client.get("http://localhost:8080/stream/5") + chunks = [] + resp.chunks do |chunk| + chunks << chunk + assert_kind_of String, chunk, "Each yielded chunk must be a String" + end + assert_equal 5, chunks.size, "Should yield exactly 5 chunks from /stream/5" + end + + def test_chunks_yields_binary_encoding + client = Wreq::Client.new + resp = client.get("http://localhost:8080/stream/3") + resp.chunks do |chunk| + assert chunk.encoding == Encoding::BINARY || chunk.encoding == Encoding::ASCII_8BIT, + "Chunk should have binary encoding, got #{chunk.encoding}" + end + end + + def test_chunks_with_single_chunk_body + client = Wreq::Client.new + resp = client.get("http://localhost:8080/bytes/1024") + chunk_count = 0 + total_bytes = 0 + resp.chunks do |chunk| + chunk_count += 1 + total_bytes += chunk.bytesize + end + assert chunk_count >= 1, "Should yield at least one chunk" + assert_equal 1024, total_bytes, "Total bytes should match content length" + end + + def test_chunks_returns_nil + client = Wreq::Client.new + resp = client.get("http://localhost:8080/stream/3") + result = resp.chunks { |_chunk| :processing } + assert_nil result, "chunks should return nil after completion" + end + + def test_chunks_with_empty_body + client = Wreq::Client.new + resp = client.get("http://localhost:8080/status/204") + chunk_count = 0 + resp.chunks do |_chunk| + chunk_count += 1 + end + assert_equal 0, chunk_count, "No chunks should be yielded for empty 204 response" + end + + # ========================================================================= + # Block requirement + # ========================================================================= + + def test_chunks_without_block_raises_error + client = Wreq::Client.new + resp = client.get("http://localhost:8080/stream/3") + assert_raises(LocalJumpError) do + resp.chunks + end + end + + # ========================================================================= + # GVL concurrency — other Ruby threads can run during streaming + # ========================================================================= + + def test_other_threads_run_during_streaming + client = Wreq::Client.new + resp = client.get("http://localhost:8080/drip?duration=3&numbytes=3&delay=1") + + counter = 0 + tick_thread = Thread.new do + 30.times do + counter += 1 + sleep 0.1 + end + end + + chunks_received = 0 + resp.chunks do |_chunk| + chunks_received += 1 + end + + tick_thread.join(10) + + assert counter > 5, + "Counter only reached #{counter} — other threads may not be running during streaming. " \ + "GVL should be released during I/O waits." + assert chunks_received >= 1, "Should have received at least one chunk" + end + + def test_multiple_concurrent_streams + client = Wreq::Client.new + results = {} + done = {} + + t1 = Thread.new do + resp = client.get("http://localhost:8080/stream/3") + chunks = [] + resp.chunks { |c| chunks << c } + results[:t1] = chunks.size + done[:t1] = true + end + + t2 = Thread.new do + resp = client.get("http://localhost:8080/stream/3") + chunks = [] + resp.chunks { |c| chunks << c } + results[:t2] = chunks.size + done[:t2] = true + end + + t1.join(15) + t2.join(15) + + assert done[:t1], "Thread 1 should complete" + assert done[:t2], "Thread 2 should complete" + assert_equal 3, results[:t1], "Thread 1 should receive 3 chunks" + assert_equal 3, results[:t2], "Thread 2 should receive 3 chunks" + end + + # ========================================================================= + # Thread interruption during streaming + # ========================================================================= + + def test_thread_interrupt_during_streaming + url = "http://localhost:8080/drip?duration=5&numbytes=5" + thread = Thread.new do + resp = Wreq.get(url) + resp.chunks { |chunk| chunk } + rescue => _ + end + sleep 2 + thread.kill + killed = thread.join(5) + assert killed, "Streaming should be interruptible via Thread.kill" + end + + def test_thread_interrupt_during_slow_stream_with_block_processing + url = "http://localhost:8080/drip?duration=5&numbytes=5&delay=1" + thread = Thread.new do + resp = Wreq.get(url) + resp.chunks do |_chunk| + sleep 0.5 + end + rescue => _ + end + sleep 2 + thread.kill + killed = thread.join(5) + assert killed, "Streaming with slow block processing should be interruptible" + end + + # ========================================================================= + # Streaming error propagation + # ========================================================================= + + def test_chunks_propagates_streaming_errors + client = Wreq::Client.new + resp = client.get("http://localhost:8080/drip?duration=10&numbytes=10", timeout: 1) + error_raised = false + begin + resp.chunks do |_chunk| + # Just consume chunks — the timeout should fire mid-stream + end + rescue => e + error_raised = true + assert( + e.is_a?(Wreq::TimeoutError) || e.is_a?(Wreq::BodyError) || e.is_a?(Wreq::ConnectionResetError), + "Expected a streaming error (TimeoutError/BodyError/ConnectionResetError), got #{e.class}: #{e.message}" + ) + end + assert error_raised, "A streaming error should have been raised for a timed-out drip response" + end + + # ========================================================================= + # Block exception propagation + # ========================================================================= + + def test_exception_in_block_propagates + client = Wreq::Client.new + resp = client.get("http://localhost:8080/stream/5") + error_raised = false + chunks_before_error = 0 + begin + resp.chunks do |chunk| + chunks_before_error += 1 + if chunks_before_error == 2 + raise RuntimeError, "intentional error in block" + end + end + rescue RuntimeError => e + error_raised = true + assert_equal "intentional error in block", e.message + end + assert error_raised, "Exception raised inside the block should propagate out" + assert_equal 2, chunks_before_error, "Should have processed 2 chunks before the error" + end + + # ========================================================================= + # Double-consumption protection + # ========================================================================= + + def test_chunks_called_twice_raises_error + client = Wreq::Client.new + resp = client.get("http://localhost:8080/stream/3") + resp.chunks { |_c| } + + error_raised = false + begin + resp.chunks { |_c| } + rescue => e + error_raised = true + assert_instance_of Wreq::MemoryError, e, + "Second chunks call should raise MemoryError, got #{e.class}: #{e.message}" + end + assert error_raised, "Second chunks call should raise an error" + end + + def test_text_after_chunks_raises_error + client = Wreq::Client.new + resp = client.get("http://localhost:8080/stream/3") + resp.chunks { |_c| } + + error_raised = false + begin + resp.text + rescue => e + error_raised = true + assert_instance_of Wreq::MemoryError, e, + "Calling text after chunks should raise MemoryError, got #{e.class}: #{e.message}" + end + assert error_raised, "Calling text after chunks should raise an error" + end + + # ========================================================================= + # Chunk content integrity + # ========================================================================= + + def test_chunks_content_matches_full_body + client = Wreq::Client.new + resp_full = client.get("http://localhost:8080/bytes/4096") + full_bytes = resp_full.bytes + + resp_stream = client.get("http://localhost:8080/bytes/4096") + streamed_bytes = "".b + resp_stream.chunks do |chunk| + streamed_bytes << chunk + end + + assert_equal full_bytes.bytesize, streamed_bytes.bytesize, + "Streamed body size should match full body size" + end + + def test_chunks_json_stream_content + client = Wreq::Client.new + resp = client.get("http://localhost:8080/stream/5") + chunks = [] + resp.chunks do |chunk| + chunks << chunk + end + + chunks.each_with_index do |chunk, i| + assert_match(/\{.*\}/, chunk, + "Chunk #{i} should contain a JSON object, got: #{chunk[0..80]}") + end + end + + # ========================================================================= + # Block capture / GC safety + # ========================================================================= + + def test_block_not_garbage_collected_during_streaming + client = Wreq::Client.new + resp = client.get("http://localhost:8080/drip?duration=3&numbytes=3&delay=1") + + chunks_received = 0 + resp.chunks do |chunk| + chunks_received += 1 + GC.start + GC.start + end + + assert_equal 3, chunks_received, + "All 3 chunks should be received even with forced GC between yields" + end + + # ========================================================================= + # close() during/after streaming + # ========================================================================= + + def test_close_after_streaming + client = Wreq::Client.new + resp = client.get("http://localhost:8080/stream/3") + resp.chunks { |_c| } + resp.close + end + + # ========================================================================= + # Client method variants + # ========================================================================= + + def test_chunks_via_module_method + resp = Wreq.get("http://localhost:8080/stream/3") + chunks = [] + resp.chunks do |chunk| + chunks << chunk + end + assert_equal 3, chunks.size, "Module-level Wreq.get + chunks should work" + end + + def test_chunks_via_client_instance + client = Wreq::Client.new + resp = client.get("http://localhost:8080/stream/3") + chunks = [] + resp.chunks do |chunk| + chunks << chunk + end + assert_equal 3, chunks.size, "Client instance get + chunks should work" + end +end \ No newline at end of file