Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/erlang.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ jobs:
release: "14.2"
usesh: true
prepare: |
pkg install -y erlang-runtime28 rebar3 cmake git gmake go llvm18
pkg install -y pcre2 erlang-runtime28 rebar3 cmake git gmake go llvm18
run: |
# Ensure Erlang 28 is in PATH
export PATH="/usr/local/lib/erlang28/bin:$PATH"
Expand Down
76 changes: 73 additions & 3 deletions src/hackney_conn.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1031,7 +1031,15 @@ streaming_body(internal, {send_headers_only, Method, Path, Headers}, Data) ->
streaming_body({call, From}, {send_body_chunk, BodyData}, #conn_data{protocol = http3} = Data) ->
%% HTTP/3 - send body chunk via QUIC
#conn_data{h3_conn = ConnRef, h3_stream_id = StreamId} = Data,
case hackney_h3:send_body_chunk(ConnRef, StreamId, iolist_to_binary(BodyData), false) of
Result = case BodyData of
Fun when is_function(Fun, 0) ->
stream_body_fun_h3(ConnRef, StreamId, Fun);
{Fun, State} when is_function(Fun, 1) ->
stream_body_fun_h3(ConnRef, StreamId, {Fun, State});
_ ->
hackney_h3:send_body_chunk(ConnRef, StreamId, iolist_to_binary(BodyData), false)
end,
case Result of
ok ->
{keep_state_and_data, [{reply, From, ok}]};
{error, Reason} ->
Expand All @@ -1041,8 +1049,15 @@ streaming_body({call, From}, {send_body_chunk, BodyData}, #conn_data{protocol =
streaming_body({call, From}, {send_body_chunk, BodyData}, Data) ->
#conn_data{transport = Transport, socket = Socket} = Data,
%% Send as chunked encoding (HTTP/1.1)
ChunkData = encode_chunk(BodyData),
case Transport:send(Socket, ChunkData) of
Result = case BodyData of
Fun when is_function(Fun, 0) ->
stream_body_fun(Transport, Socket, Fun);
{Fun, State} when is_function(Fun, 1) ->
stream_body_fun(Transport, Socket, {Fun, State});
_ ->
Transport:send(Socket, encode_chunk(BodyData))
end,
case Result of
ok ->
{keep_state_and_data, [{reply, From, ok}]};
{error, Reason} ->
Expand Down Expand Up @@ -1705,6 +1720,61 @@ encode_chunk(Data) when is_list(Data) ->
SizeHex = integer_to_binary(Size, 16),
[SizeHex, <<"\r\n">>, Data, <<"\r\n">>].

%% @private Stream body from stateless function
%% fun() -> {ok, Data} | eof | {error, Reason}
stream_body_fun(Transport, Socket, Fun) when is_function(Fun, 0) ->
case Fun() of
{ok, Data} ->
case Transport:send(Socket, encode_chunk(Data)) of
ok -> stream_body_fun(Transport, Socket, Fun);
Error -> Error
end;
eof ->
ok;
{error, _} = Error ->
Error
end;
%% @private Stream body from stateful function
%% fun(State) -> {ok, Data, NewState} | eof | {error, Reason}
stream_body_fun(Transport, Socket, {Fun, State}) when is_function(Fun, 1) ->
case Fun(State) of
{ok, Data, NewState} ->
case Transport:send(Socket, encode_chunk(Data)) of
ok -> stream_body_fun(Transport, Socket, {Fun, NewState});
Error -> Error
end;
eof ->
ok;
{error, _} = Error ->
Error
end.

%% @private Stream body from function for HTTP/3
stream_body_fun_h3(ConnRef, StreamId, Fun) when is_function(Fun, 0) ->
case Fun() of
{ok, Data} ->
case hackney_h3:send_body_chunk(ConnRef, StreamId, iolist_to_binary(Data), false) of
ok -> stream_body_fun_h3(ConnRef, StreamId, Fun);
Error -> Error
end;
eof ->
ok;
{error, _} = Error ->
Error
end;
stream_body_fun_h3(ConnRef, StreamId, {Fun, State}) when is_function(Fun, 1) ->
case Fun(State) of
{ok, Data, NewState} ->
case hackney_h3:send_body_chunk(ConnRef, StreamId, iolist_to_binary(Data), false) of
ok -> stream_body_fun_h3(ConnRef, StreamId, {Fun, NewState});
Error -> Error
end;
eof ->
ok;
{error, _} = Error ->
Error
end.

%% @private Get default User-Agent
default_ua() ->
Version = case application:get_key(hackney, vsn) of
Expand Down
109 changes: 109 additions & 0 deletions test/hackney_conn_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ hackney_conn_integration_test_() ->
{"simple GET request", {timeout, 30, fun test_get_request/0}},
{"POST request with body", {timeout, 30, fun test_post_request/0}},
{"stream body", {timeout, 30, fun test_stream_body/0}},
{"stream body with stateless function", {timeout, 30, fun test_stream_body_stateless_fun/0}},
{"stream body with stateful function", {timeout, 30, fun test_stream_body_stateful_fun/0}},
{"stream body function returns error", {timeout, 30, fun test_stream_body_fun_error/0}},
{"HEAD request", {timeout, 30, fun test_head_request/0}},
{"request returns to connected state", {timeout, 30, fun test_request_state_cycle/0}},
%% Async tests
Expand Down Expand Up @@ -306,6 +309,112 @@ test_stream_body() ->

hackney_conn:stop(Pid).

test_stream_body_stateless_fun() ->
Opts = #{
host => "127.0.0.1",
port => ?PORT,
transport => hackney_tcp,
connect_timeout => 5000,
recv_timeout => 5000
},
{ok, Pid} = hackney_conn:start_link(Opts),
ok = hackney_conn:connect(Pid),

%% Create stateless function using ets table for state
%% (function runs in hackney_conn process, not test process)
Chunks = [<<"chunk1">>, <<"chunk2">>, <<"chunk3">>],
Tab = ets:new(stream_test, [public, set]),
ets:insert(Tab, {chunks, Chunks}),
Fun = fun() ->
case ets:lookup(Tab, chunks) of
[{chunks, []}] -> eof;
[{chunks, [H | T]}] ->
ets:insert(Tab, {chunks, T}),
{ok, H}
end
end,

%% Send headers for streaming body (Host header required for HTTP/1.1)
Headers = [{<<"Host">>, <<"127.0.0.1:", (integer_to_binary(?PORT))/binary>>},
{<<"Content-Type">>, <<"text/plain">>}],
ok = hackney_conn:send_request_headers(Pid, <<"POST">>, <<"/post">>, Headers),

%% Send body using function
ok = hackney_conn:send_body_chunk(Pid, Fun),
ok = hackney_conn:finish_send_body(Pid),

%% Get response
{ok, Status, _RespHeaders, _} = hackney_conn:start_response(Pid),
?assert(Status >= 200 andalso Status < 300),

%% Read response body
{ok, RespBody} = hackney_conn:body(Pid),
?assert(is_binary(RespBody)),

hackney_conn:stop(Pid),
ets:delete(Tab).

test_stream_body_stateful_fun() ->
Opts = #{
host => "127.0.0.1",
port => ?PORT,
transport => hackney_tcp,
connect_timeout => 5000,
recv_timeout => 5000
},
{ok, Pid} = hackney_conn:start_link(Opts),
ok = hackney_conn:connect(Pid),

%% Stateful function: fun(State) -> {ok, Data, NewState} | eof
Chunks = [<<"hello ">>, <<"world">>, <<"!">>],
Fun = fun
([]) -> eof;
([H | T]) -> {ok, H, T}
end,

%% Host header required for HTTP/1.1
Headers = [{<<"Host">>, <<"127.0.0.1:", (integer_to_binary(?PORT))/binary>>},
{<<"Content-Type">>, <<"text/plain">>}],
ok = hackney_conn:send_request_headers(Pid, <<"POST">>, <<"/post">>, Headers),

%% Send body using stateful function
ok = hackney_conn:send_body_chunk(Pid, {Fun, Chunks}),
ok = hackney_conn:finish_send_body(Pid),

{ok, Status, _RespHeaders, _} = hackney_conn:start_response(Pid),
?assert(Status >= 200 andalso Status < 300),

{ok, RespBody} = hackney_conn:body(Pid),
?assert(is_binary(RespBody)),

hackney_conn:stop(Pid).

test_stream_body_fun_error() ->
Opts = #{
host => "127.0.0.1",
port => ?PORT,
transport => hackney_tcp,
connect_timeout => 5000,
recv_timeout => 5000
},
{ok, Pid} = hackney_conn:start_link(Opts),
ok = hackney_conn:connect(Pid),

%% Function that returns error immediately
Fun = fun() -> {error, simulated_error} end,

%% Host header required for HTTP/1.1
Headers = [{<<"Host">>, <<"127.0.0.1:", (integer_to_binary(?PORT))/binary>>}],
ok = hackney_conn:send_request_headers(Pid, <<"POST">>, <<"/post">>, Headers),

%% Should return error
{error, simulated_error} = hackney_conn:send_body_chunk(Pid, Fun),

%% Connection should be in closed state after error
timer:sleep(100),
?assertEqual({ok, closed}, hackney_conn:get_state(Pid)),
hackney_conn:stop(Pid).

test_head_request() ->
Opts = #{
host => "127.0.0.1",
Expand Down
Loading