Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 121 additions & 13 deletions src/basho_bench_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,10 @@ worker_idle_loop(State) ->
?INFO("Starting max worker: ~p on ~p~n", [self(), node()]),
max_worker_run_loop(State);
{rate, Rate} ->
%% Calculate mean interarrival time in in milliseconds. A
%% fixed rate worker can generate (at max) only 1k req/sec.
MeanArrival = 1000 / Rate,
?INFO("Starting ~w ms/req fixed rate worker: ~p on ~p\n", [MeanArrival, self(), node()]),
rate_worker_run_loop(State, 1 / MeanArrival)
?INFO(
"Starting ~w persec fixed rate worker: ~p on ~p\n",
[Rate, self(), node()]),
rate_worker_run_loop(State, Rate)
end
end.

Expand Down Expand Up @@ -347,18 +346,127 @@ max_worker_run_loop(State) ->
exit(ExitReason)
end.

rate_worker_run_loop(State, Lambda) ->
%% Delay between runs using exponentially distributed delays to mimic
%% queue.
timer:sleep(trunc(basho_bench_stats:exponential(Lambda))),
case worker_next_op(State) of
{ok, State2} ->
case needs_shutdown(State2) of
rate_worker_run_loop(State, Rate) when Rate > 0, Rate =< 1000 ->
WorkerFun = fun worker_next_op/1,
ShutdownFun = fun needs_shutdown/1,
rate_worker_run_loop(State, Rate, WorkerFun, ShutdownFun).

rate_worker_run_loop(State, Rate, WorkerFun, ShutdownFun) ->
rate_worker_run_loop(State, Rate, WorkerFun, ShutdownFun, 0, 1000 / Rate, os:timestamp()).

rate_worker_run_loop(State, Rate, WorkerFun, ShutdownFun, Count, InitDelayMS, LastTS) ->
{UpdDelayMS, UpdTS} =
case (Count > 0) and (Count rem 100 == 0) of
true ->
ActualTimeMS = timer:now_diff(os:timestamp(), LastTS) / 1000,
ExpectedTimeMS = 1000 * (100 / Rate),
DiffTimeMS = ActualTimeMS - ExpectedTimeMS,
{max(0.0, ((100 * InitDelayMS) - DiffTimeMS) / 100),
os:timestamp()};
_ ->
{InitDelayMS, LastTS}
end,
ThisDelay = trunc(-math:log(rand:uniform()) * UpdDelayMS),
timer:sleep(ThisDelay),
case WorkerFun(State) of
{ok, UpdState} ->
case ShutdownFun(UpdState) of
true ->
ok;
false ->
rate_worker_run_loop(State2, Lambda)
rate_worker_run_loop(
UpdState,
Rate,
WorkerFun, ShutdownFun,
Count + 1,
UpdDelayMS, UpdTS)
end;
ExitReason ->
exit(ExitReason)
end.




%% ===================================================================
%% EUnit tests
%% ===================================================================
-ifdef(TEST).

-include_lib("eunit/include/eunit.hrl").

rate_work_test_() ->
{timeout, 120, fun rate_worker_tester/0}.

rate_worker_tester() ->
WorkerFunGen =
fun(N) ->
fun(I) ->
timer:sleep(round(rand:uniform() * 2 * N)), {ok, I + 1}
end
end,
ShutdownFunGen =
fun(C) ->
fun(I) -> I >= C end
end,

% rate = 100, avg response_time = 2 ms, 1000 requests
{T0, ok} =
timer:tc(
fun rate_worker_run_loop/4,
[1, 100, WorkerFunGen(2), ShutdownFunGen(1000)]
),
?assert(approx(T0, 10 * 1000000)),

% rate = 100, avg response_time = 5 ms, 1000 requests
{T1, ok} =
timer:tc(
fun rate_worker_run_loop/4,
[1, 100, WorkerFunGen(5), ShutdownFunGen(1000)]
),
?assert(approx(T1, 10 * 1000000)),

% rate = 100, avg response_time = 15 ms, 1000 requests
% can't achieve rate - work like max
% The first 100 will take + 10ms each - so expected is + 1s
{T2, ok} =
timer:tc(
fun rate_worker_run_loop/4,
[1, 100, WorkerFunGen(15), ShutdownFunGen(1000)]
),
?assert(approx(T2, (15 + 1) * 1000000)),

% rate = 50, avg response_time = 15 ms, 1000 requests
{T3, ok} =
timer:tc(
fun rate_worker_run_loop/4,
[1, 50, WorkerFunGen(15), ShutdownFunGen(1000)]
),
?assert(approx(T3, 20 * 1000000)),

% rate = 200, avg response_time = 2 ms, 1000 requests
{T4, ok} =
timer:tc(
fun rate_worker_run_loop/4,
[1, 200, WorkerFunGen(2), ShutdownFunGen(1000)]
),
?assert(approx(T4, 5 * 1000000)),

% rate = 500, avg response_time = 1 ms, 1000 requests
{T5, ok} =
timer:tc(
fun rate_worker_run_loop/4,
[1, 500, WorkerFunGen(1), ShutdownFunGen(1000)]
),
?assert(approx(T5, 2 * 1000000))
.

approx(TimeTaken, TimeExpected)
when TimeTaken > (TimeExpected * 0.9),
TimeTaken < (TimeExpected * 1.1) ->
true;
approx(TimeTaken, TimeExpected) ->
io:format("TimeTaken ~w but TimeExpected ~w~n", [TimeTaken, TimeExpected]),
false.

-endif.