diff --git a/.github/workflows/erlang.yml b/.github/workflows/erlang.yml index 81efec93..c5101cf7 100644 --- a/.github/workflows/erlang.yml +++ b/.github/workflows/erlang.yml @@ -101,7 +101,7 @@ jobs: release: "14.2" usesh: true prepare: | - pkg install -y erlang-runtime28 rebar3 cmake git gmake go llvm18 + pkg install -y pcre2 erlang-runtime28 rebar3 cmake git gmake go llvm18 run: | # Ensure Erlang 28 is in PATH export PATH="/usr/local/lib/erlang28/bin:$PATH" diff --git a/src/hackney_conn.erl b/src/hackney_conn.erl index fc573181..16aa148c 100644 --- a/src/hackney_conn.erl +++ b/src/hackney_conn.erl @@ -1031,7 +1031,15 @@ streaming_body(internal, {send_headers_only, Method, Path, Headers}, Data) -> streaming_body({call, From}, {send_body_chunk, BodyData}, #conn_data{protocol = http3} = Data) -> %% HTTP/3 - send body chunk via QUIC #conn_data{h3_conn = ConnRef, h3_stream_id = StreamId} = Data, - case hackney_h3:send_body_chunk(ConnRef, StreamId, iolist_to_binary(BodyData), false) of + Result = case BodyData of + Fun when is_function(Fun, 0) -> + stream_body_fun_h3(ConnRef, StreamId, Fun); + {Fun, State} when is_function(Fun, 1) -> + stream_body_fun_h3(ConnRef, StreamId, {Fun, State}); + _ -> + hackney_h3:send_body_chunk(ConnRef, StreamId, iolist_to_binary(BodyData), false) + end, + case Result of ok -> {keep_state_and_data, [{reply, From, ok}]}; {error, Reason} -> @@ -1041,8 +1049,15 @@ streaming_body({call, From}, {send_body_chunk, BodyData}, #conn_data{protocol = streaming_body({call, From}, {send_body_chunk, BodyData}, Data) -> #conn_data{transport = Transport, socket = Socket} = Data, %% Send as chunked encoding (HTTP/1.1) - ChunkData = encode_chunk(BodyData), - case Transport:send(Socket, ChunkData) of + Result = case BodyData of + Fun when is_function(Fun, 0) -> + stream_body_fun(Transport, Socket, Fun); + {Fun, State} when is_function(Fun, 1) -> + stream_body_fun(Transport, Socket, {Fun, State}); + _ -> + Transport:send(Socket, encode_chunk(BodyData)) + end, + case Result of ok -> {keep_state_and_data, [{reply, From, ok}]}; {error, Reason} -> @@ -1705,6 +1720,61 @@ encode_chunk(Data) when is_list(Data) -> SizeHex = integer_to_binary(Size, 16), [SizeHex, <<"\r\n">>, Data, <<"\r\n">>]. +%% @private Stream body from stateless function +%% fun() -> {ok, Data} | eof | {error, Reason} +stream_body_fun(Transport, Socket, Fun) when is_function(Fun, 0) -> + case Fun() of + {ok, Data} -> + case Transport:send(Socket, encode_chunk(Data)) of + ok -> stream_body_fun(Transport, Socket, Fun); + Error -> Error + end; + eof -> + ok; + {error, _} = Error -> + Error + end; +%% @private Stream body from stateful function +%% fun(State) -> {ok, Data, NewState} | eof | {error, Reason} +stream_body_fun(Transport, Socket, {Fun, State}) when is_function(Fun, 1) -> + case Fun(State) of + {ok, Data, NewState} -> + case Transport:send(Socket, encode_chunk(Data)) of + ok -> stream_body_fun(Transport, Socket, {Fun, NewState}); + Error -> Error + end; + eof -> + ok; + {error, _} = Error -> + Error + end. + +%% @private Stream body from function for HTTP/3 +stream_body_fun_h3(ConnRef, StreamId, Fun) when is_function(Fun, 0) -> + case Fun() of + {ok, Data} -> + case hackney_h3:send_body_chunk(ConnRef, StreamId, iolist_to_binary(Data), false) of + ok -> stream_body_fun_h3(ConnRef, StreamId, Fun); + Error -> Error + end; + eof -> + ok; + {error, _} = Error -> + Error + end; +stream_body_fun_h3(ConnRef, StreamId, {Fun, State}) when is_function(Fun, 1) -> + case Fun(State) of + {ok, Data, NewState} -> + case hackney_h3:send_body_chunk(ConnRef, StreamId, iolist_to_binary(Data), false) of + ok -> stream_body_fun_h3(ConnRef, StreamId, {Fun, NewState}); + Error -> Error + end; + eof -> + ok; + {error, _} = Error -> + Error + end. + %% @private Get default User-Agent default_ua() -> Version = case application:get_key(hackney, vsn) of diff --git a/test/hackney_conn_tests.erl b/test/hackney_conn_tests.erl index 09c853d8..d31658d1 100644 --- a/test/hackney_conn_tests.erl +++ b/test/hackney_conn_tests.erl @@ -42,6 +42,9 @@ hackney_conn_integration_test_() -> {"simple GET request", {timeout, 30, fun test_get_request/0}}, {"POST request with body", {timeout, 30, fun test_post_request/0}}, {"stream body", {timeout, 30, fun test_stream_body/0}}, + {"stream body with stateless function", {timeout, 30, fun test_stream_body_stateless_fun/0}}, + {"stream body with stateful function", {timeout, 30, fun test_stream_body_stateful_fun/0}}, + {"stream body function returns error", {timeout, 30, fun test_stream_body_fun_error/0}}, {"HEAD request", {timeout, 30, fun test_head_request/0}}, {"request returns to connected state", {timeout, 30, fun test_request_state_cycle/0}}, %% Async tests @@ -306,6 +309,112 @@ test_stream_body() -> hackney_conn:stop(Pid). +test_stream_body_stateless_fun() -> + Opts = #{ + host => "127.0.0.1", + port => ?PORT, + transport => hackney_tcp, + connect_timeout => 5000, + recv_timeout => 5000 + }, + {ok, Pid} = hackney_conn:start_link(Opts), + ok = hackney_conn:connect(Pid), + + %% Create stateless function using ets table for state + %% (function runs in hackney_conn process, not test process) + Chunks = [<<"chunk1">>, <<"chunk2">>, <<"chunk3">>], + Tab = ets:new(stream_test, [public, set]), + ets:insert(Tab, {chunks, Chunks}), + Fun = fun() -> + case ets:lookup(Tab, chunks) of + [{chunks, []}] -> eof; + [{chunks, [H | T]}] -> + ets:insert(Tab, {chunks, T}), + {ok, H} + end + end, + + %% Send headers for streaming body (Host header required for HTTP/1.1) + Headers = [{<<"Host">>, <<"127.0.0.1:", (integer_to_binary(?PORT))/binary>>}, + {<<"Content-Type">>, <<"text/plain">>}], + ok = hackney_conn:send_request_headers(Pid, <<"POST">>, <<"/post">>, Headers), + + %% Send body using function + ok = hackney_conn:send_body_chunk(Pid, Fun), + ok = hackney_conn:finish_send_body(Pid), + + %% Get response + {ok, Status, _RespHeaders, _} = hackney_conn:start_response(Pid), + ?assert(Status >= 200 andalso Status < 300), + + %% Read response body + {ok, RespBody} = hackney_conn:body(Pid), + ?assert(is_binary(RespBody)), + + hackney_conn:stop(Pid), + ets:delete(Tab). + +test_stream_body_stateful_fun() -> + Opts = #{ + host => "127.0.0.1", + port => ?PORT, + transport => hackney_tcp, + connect_timeout => 5000, + recv_timeout => 5000 + }, + {ok, Pid} = hackney_conn:start_link(Opts), + ok = hackney_conn:connect(Pid), + + %% Stateful function: fun(State) -> {ok, Data, NewState} | eof + Chunks = [<<"hello ">>, <<"world">>, <<"!">>], + Fun = fun + ([]) -> eof; + ([H | T]) -> {ok, H, T} + end, + + %% Host header required for HTTP/1.1 + Headers = [{<<"Host">>, <<"127.0.0.1:", (integer_to_binary(?PORT))/binary>>}, + {<<"Content-Type">>, <<"text/plain">>}], + ok = hackney_conn:send_request_headers(Pid, <<"POST">>, <<"/post">>, Headers), + + %% Send body using stateful function + ok = hackney_conn:send_body_chunk(Pid, {Fun, Chunks}), + ok = hackney_conn:finish_send_body(Pid), + + {ok, Status, _RespHeaders, _} = hackney_conn:start_response(Pid), + ?assert(Status >= 200 andalso Status < 300), + + {ok, RespBody} = hackney_conn:body(Pid), + ?assert(is_binary(RespBody)), + + hackney_conn:stop(Pid). + +test_stream_body_fun_error() -> + Opts = #{ + host => "127.0.0.1", + port => ?PORT, + transport => hackney_tcp, + connect_timeout => 5000, + recv_timeout => 5000 + }, + {ok, Pid} = hackney_conn:start_link(Opts), + ok = hackney_conn:connect(Pid), + + %% Function that returns error immediately + Fun = fun() -> {error, simulated_error} end, + + %% Host header required for HTTP/1.1 + Headers = [{<<"Host">>, <<"127.0.0.1:", (integer_to_binary(?PORT))/binary>>}], + ok = hackney_conn:send_request_headers(Pid, <<"POST">>, <<"/post">>, Headers), + + %% Should return error + {error, simulated_error} = hackney_conn:send_body_chunk(Pid, Fun), + + %% Connection should be in closed state after error + timer:sleep(100), + ?assertEqual({ok, closed}, hackney_conn:get_state(Pid)), + hackney_conn:stop(Pid). + test_head_request() -> Opts = #{ host => "127.0.0.1",