diff --git a/src/basho_bench_worker.erl b/src/basho_bench_worker.erl index 0ab3dbf..1e66420 100644 --- a/src/basho_bench_worker.erl +++ b/src/basho_bench_worker.erl @@ -240,11 +240,10 @@ worker_idle_loop(State) -> ?INFO("Starting max worker: ~p on ~p~n", [self(), node()]), max_worker_run_loop(State); {rate, Rate} -> - %% Calculate mean interarrival time in in milliseconds. A - %% fixed rate worker can generate (at max) only 1k req/sec. - MeanArrival = 1000 / Rate, - ?INFO("Starting ~w ms/req fixed rate worker: ~p on ~p\n", [MeanArrival, self(), node()]), - rate_worker_run_loop(State, 1 / MeanArrival) + ?INFO( + "Starting ~w persec fixed rate worker: ~p on ~p\n", + [Rate, self(), node()]), + rate_worker_run_loop(State, Rate) end end. @@ -347,18 +346,127 @@ max_worker_run_loop(State) -> exit(ExitReason) end. -rate_worker_run_loop(State, Lambda) -> - %% Delay between runs using exponentially distributed delays to mimic - %% queue. - timer:sleep(trunc(basho_bench_stats:exponential(Lambda))), - case worker_next_op(State) of - {ok, State2} -> - case needs_shutdown(State2) of +rate_worker_run_loop(State, Rate) when Rate > 0, Rate =< 1000 -> + WorkerFun = fun worker_next_op/1, + ShutdownFun = fun needs_shutdown/1, + rate_worker_run_loop(State, Rate, WorkerFun, ShutdownFun). + +rate_worker_run_loop(State, Rate, WorkerFun, ShutdownFun) -> + rate_worker_run_loop(State, Rate, WorkerFun, ShutdownFun, 0, 1000 / Rate, os:timestamp()). + +rate_worker_run_loop(State, Rate, WorkerFun, ShutdownFun, Count, InitDelayMS, LastTS) -> + {UpdDelayMS, UpdTS} = + case (Count > 0) and (Count rem 100 == 0) of + true -> + ActualTimeMS = timer:now_diff(os:timestamp(), LastTS) / 1000, + ExpectedTimeMS = 1000 * (100 / Rate), + DiffTimeMS = ActualTimeMS - ExpectedTimeMS, + {max(0.0, ((100 * InitDelayMS) - DiffTimeMS) / 100), + os:timestamp()}; + _ -> + {InitDelayMS, LastTS} + end, + ThisDelay = trunc(-math:log(rand:uniform()) * UpdDelayMS), + timer:sleep(ThisDelay), + case WorkerFun(State) of + {ok, UpdState} -> + case ShutdownFun(UpdState) of true -> ok; false -> - rate_worker_run_loop(State2, Lambda) + rate_worker_run_loop( + UpdState, + Rate, + WorkerFun, ShutdownFun, + Count + 1, + UpdDelayMS, UpdTS) end; ExitReason -> exit(ExitReason) end. + + + + +%% =================================================================== +%% EUnit tests +%% =================================================================== +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +rate_work_test_() -> + {timeout, 120, fun rate_worker_tester/0}. + +rate_worker_tester() -> + WorkerFunGen = + fun(N) -> + fun(I) -> + timer:sleep(round(rand:uniform() * 2 * N)), {ok, I + 1} + end + end, + ShutdownFunGen = + fun(C) -> + fun(I) -> I >= C end + end, + + % rate = 100, avg response_time = 2 ms, 1000 requests + {T0, ok} = + timer:tc( + fun rate_worker_run_loop/4, + [1, 100, WorkerFunGen(2), ShutdownFunGen(1000)] + ), + ?assert(approx(T0, 10 * 1000000)), + + % rate = 100, avg response_time = 5 ms, 1000 requests + {T1, ok} = + timer:tc( + fun rate_worker_run_loop/4, + [1, 100, WorkerFunGen(5), ShutdownFunGen(1000)] + ), + ?assert(approx(T1, 10 * 1000000)), + + % rate = 100, avg response_time = 15 ms, 1000 requests + % can't achieve rate - work like max + % The first 100 will take + 10ms each - so expected is + 1s + {T2, ok} = + timer:tc( + fun rate_worker_run_loop/4, + [1, 100, WorkerFunGen(15), ShutdownFunGen(1000)] + ), + ?assert(approx(T2, (15 + 1) * 1000000)), + + % rate = 50, avg response_time = 15 ms, 1000 requests + {T3, ok} = + timer:tc( + fun rate_worker_run_loop/4, + [1, 50, WorkerFunGen(15), ShutdownFunGen(1000)] + ), + ?assert(approx(T3, 20 * 1000000)), + + % rate = 200, avg response_time = 2 ms, 1000 requests + {T4, ok} = + timer:tc( + fun rate_worker_run_loop/4, + [1, 200, WorkerFunGen(2), ShutdownFunGen(1000)] + ), + ?assert(approx(T4, 5 * 1000000)), + + % rate = 500, avg response_time = 1 ms, 1000 requests + {T5, ok} = + timer:tc( + fun rate_worker_run_loop/4, + [1, 500, WorkerFunGen(1), ShutdownFunGen(1000)] + ), + ?assert(approx(T5, 2 * 1000000)) + . + +approx(TimeTaken, TimeExpected) + when TimeTaken > (TimeExpected * 0.9), + TimeTaken < (TimeExpected * 1.1) -> + true; +approx(TimeTaken, TimeExpected) -> + io:format("TimeTaken ~w but TimeExpected ~w~n", [TimeTaken, TimeExpected]), + false. + +-endif.