Skip to content

Commit 3da48bf

Browse files
committed
First version of distributed work basho_bench - it needs more work, this was primarily to get something work, and (fixed) remote code loading because traditional erl_prim_loader doesn't jive super well with escript.
1 parent cee8727 commit 3da48bf

4 files changed

Lines changed: 316 additions & 44 deletions

File tree

src/basho_bench.erl

Lines changed: 84 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
-module(basho_bench).
2323

2424
-export([main/1, md5/1]).
25-
2625
-include("basho_bench.hrl").
2726

2827
%% ====================================================================
@@ -53,6 +52,8 @@ main(Args) ->
5352
{error, {already_loaded, basho_bench}} -> ok
5453
end,
5554
register(basho_bench, self()),
55+
%% TODO: Move into a proper supervision tree, janky for now
56+
{ok, _Pid} = basho_bench_config:start_link(),
5657
basho_bench_config:set(test_id, BenchName),
5758

5859
application:load(lager),
@@ -93,15 +94,16 @@ main(Args) ->
9394
%% Copy the config into the test dir for posterity
9495
[ begin {ok, _} = file:copy(Config, filename:join(TestDir, filename:basename(Config))) end
9596
|| Config <- Configs ],
96-
97+
case basho_bench_config:get(distribute_work, false) of
98+
true -> setup_distributed_work();
99+
false -> ok
100+
end,
97101
%% Set our CWD to the test dir
98102
ok = file:set_cwd(TestDir),
99-
100103
log_dimensions(),
101104

102105
%% Run pre_hook for user code preconditions
103106
run_pre_hook(),
104-
105107
%% Spin up the application
106108
ok = basho_bench_app:start(),
107109

@@ -276,6 +278,84 @@ run_hook({Module, Function}) ->
276278
run_hook(no_op) ->
277279
no_op.
278280

281+
get_addr_args() ->
282+
{ok, IfAddrs} = inet:getifaddrs(),
283+
FlattAttrib = lists:flatten([IfAttrib || {_Ifname, IfAttrib} <- IfAddrs]),
284+
Addrs = proplists:get_all_values(addr, FlattAttrib),
285+
StrAddrs = lists:map(fun(X) -> inet:ntoa(X) end, Addrs),
286+
string:join(StrAddrs, " ").
287+
setup_distributed_work() ->
288+
io:format("Using cookie: ~p~n", [erlang:get_cookie()]),
289+
case node() of
290+
'nonode@nohost' ->
291+
?STD_ERR("Basho bench not started in distributed mode, and distribute_work = true~n", []),
292+
halt(1);
293+
_ -> ok
294+
end,
295+
{ok, _Pid} = erl_boot_server:start([]),
296+
%% Allow anyone to boot from me...I might want to lock this down this down at some point
297+
erl_boot_server:add_subnet({0,0,0,0}, {0,0,0,0}),
298+
%% This is cheating, horribly, but it's the only simple way to bypass net_adm:host_file()
299+
gen_server:start({global, pool_master}, pool, [], []),
300+
RemoteSpec = basho_bench_config:get(remote_nodes, []),
301+
Cookie = lists:flatten(erlang:atom_to_list(erlang:get_cookie())),
302+
Args = "-setcookie " ++ Cookie ++ " -loader inet -hosts " ++ get_addr_args(),
303+
[io:format("Starting slave {~p, ~p, ~p}~n", [Host, Name, Args]) || {Host, Name} <- RemoteSpec],
304+
Slaves = [ slave:start_link(Host, Name, Args) || {Host, Name} <- RemoteSpec],
305+
SlaveNames = [SlaveName || {ok, SlaveName} <- Slaves],
306+
[pool:attach(SlaveName) || SlaveName <- SlaveNames],
307+
CodePaths = code:get_path(),
308+
rpc:multicall(SlaveNames, code, set_path, [CodePaths]),
309+
Apps = [lager, basho_bench, getopt, bear, folsom, ibrowse, riakc, riak_pb, mochiweb, protobuffs, velvet, goldrush],
310+
[distribute_app(App, SlaveNames) || App <- Apps].
311+
% io:format("Running escript: ~p~n", [escript:script_name()]),
312+
%% Assumes I'm running basho_bench from an escript, otherwise the behaviour is undefined
313+
% EscriptFileName = escript:script_name(),
314+
% {ok, Sections} = escript:extract(EscriptFileName, [compile_source]),
315+
% io:format("Sections: ~p~n", [Sections]),
316+
317+
% Libdir = code:lib_dir(basho_bench),
318+
% LibdirLen = string:len(Libdir),
319+
% BashoBenchLibs = lists:filter(fun(X) -> string:substr(X, 1, LibdirLen) == Libdir end, code:get_path()),
320+
% io:format("Basho Bench Library Directoryies: ~p~n", [BashoBenchLibs]),
321+
% io:format("Basho_bench lib dir: ~p~n", [Libdir]),
322+
% io:format("Libdir: ~p~n", [erl_prim_loader:list_dir(Libdir)]),
323+
% io:format("Code Path: ~p~n", [code:get_path()]),
324+
325+
% io:format("Files? ~p~n", [erl_prim_loader:list_dir("/Users/sdhillon/repos/basho_bench/basho_bench/basho_bench/ebin")]),
326+
% io:format("Remote nodes: ~p~n", [nodes()]),
327+
% RemoteFiles = rpc:call('bb25@3c075477e55e-2.local', erl_prim_loader, list_dir, ["/Users/sdhillon/repos/basho_bench/basho_bench/basho_bench/ebin"]),
328+
% io:format("Remote files: ~p~n", [RemoteFiles]),
329+
330+
331+
deploy_module(Module, Nodes) ->
332+
{Module, Binary, Filename} = code:get_object_code(Module),
333+
rpc:multicall(Nodes, code, load_binary, [Module, Filename, Binary]).
334+
335+
distribute_app(App, Nodes) ->
336+
% :(. This is super hackish, it depends on a bunch of assumptions
337+
% But, unfortunately there are negative interactions with escript and slave nodes
338+
CodeExtension = code:objfile_extension(),
339+
LibDir = code:lib_dir(App),
340+
% Get what paths are in the code path that start with LibDir
341+
LibDirLen = string:len(LibDir),
342+
EbinsDir = lists:filter(fun(CodePathDir) -> string:substr(CodePathDir, 1, LibDirLen) == LibDir end, code:get_path()),
343+
StripEndFun = fun(Path) ->
344+
PathLen = string:len(Path),
345+
case string:substr(Path, PathLen - string:len(CodeExtension) + 1, string:len(Path)) of
346+
CodeExtension ->
347+
{true, string:substr(Path, 1, PathLen - string:len(CodeExtension))};
348+
_ -> false
349+
end
350+
end,
351+
EbinDirDistributeFun = fun(EbinDir) ->
352+
{ok, Beams} = erl_prim_loader:list_dir(EbinDir),
353+
Modules = lists:filtermap(StripEndFun, Beams),
354+
ModulesLoaded = lists:map(fun(X) -> code:load_abs(filename:join(EbinDir, X)) end, Modules),
355+
lists:foreach(fun({module, Module}) -> deploy_module(Module, Nodes) end, ModulesLoaded)
356+
end,
357+
lists:foreach(EbinDirDistributeFun, EbinsDir),
358+
ok.
279359
%% just a utility, should be in basho_bench_utils.erl
280360
%% but 's' is for multiple utilities, and so far this
281361
%% is the only one.

src/basho_bench_config.erl

Lines changed: 202 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -20,27 +20,62 @@
2020
%%
2121
%% -------------------------------------------------------------------
2222
-module(basho_bench_config).
23+
-behaviour(gen_server).
24+
25+
26+
-ifdef(TEST).
27+
-include_lib("eunit/include/eunit.hrl").
28+
-compile(export_all).
29+
-endif.
2330

2431
-export([load/1,
2532
normalize_ips/2,
2633
set/2,
2734
get/1, get/2]).
2835

36+
-export([start_link/0]).
37+
38+
% Gen server callbacks
39+
-export([code_change/3, init/1, terminate/2, handle_call/3]).
40+
2941
-include("basho_bench.hrl").
3042

43+
-record(basho_bench_config_state, {config_keys}).
44+
45+
-type state() :: #basho_bench_config_state{}.
3146
%% ===================================================================
3247
%% Public API
3348
%% ===================================================================
3449

50+
ensure_started() ->
51+
start_link().
52+
53+
start_link() ->
54+
gen_server:start_link({global, ?MODULE}, ?MODULE, [], []).
55+
56+
3557
load(Files) ->
36-
TermsList =
37-
[ case file:consult(File) of
38-
{ok, Terms} ->
39-
Terms;
40-
{error, Reason} ->
41-
?FAIL_MSG("Failed to parse config file ~s: ~p\n", [File, Reason])
42-
end || File <- Files ],
43-
load_config(lists:append(TermsList)).
58+
ensure_started(),
59+
gen_server:call({global, ?MODULE}, {load_files, Files}).
60+
61+
set(Key, Value) ->
62+
gen_server:call({global, ?MODULE}, {set, Key, Value}).
63+
64+
get(Key) ->
65+
case gen_server:call({global, ?MODULE}, {get, Key}) of
66+
{ok, Value} ->
67+
Value;
68+
error ->
69+
erlang:error("Missing configuration key", [Key])
70+
end.
71+
72+
get(Key, Default) ->
73+
case gen_server:call({global, ?MODULE}, {get, Key}) of
74+
{ok, Value} ->
75+
Value;
76+
error ->
77+
Default
78+
end.
4479

4580
%% @doc Normalize the list of IPs and Ports.
4681
%%
@@ -58,42 +93,177 @@ normalize_ips(IPs, DefultPort) ->
5893
end,
5994
lists:foldl(F, [], IPs).
6095

61-
set(Key, Value) ->
62-
ok = application:set_env(basho_bench, Key, Value).
6396

64-
get(Key) ->
65-
case application:get_env(basho_bench, Key) of
66-
{ok, Value} ->
67-
Value;
68-
undefined ->
69-
erlang:error("Missing configuration key", [Key])
70-
end.
7197

72-
get(Key, Default) ->
73-
case application:get_env(basho_bench, Key) of
74-
{ok, Value} ->
75-
Value;
76-
_ ->
77-
Default
78-
end.
7998

8099

81100
%% ===================================================================
82101
%% Internal functions
83102
%% ===================================================================
84103

85-
load_config([]) ->
86-
ok;
87-
load_config([{Key, Value} | Rest]) ->
88-
?MODULE:set(Key, Value),
89-
load_config(Rest);
90-
load_config([ Other | Rest]) ->
91-
?WARN("Ignoring non-tuple config value: ~p\n", [Other]),
92-
load_config(Rest).
93104

94105
normalize_ip_entry({IP, Ports}, Normalized, _) when is_list(Ports) ->
95106
[{IP, Port} || Port <- Ports] ++ Normalized;
96107
normalize_ip_entry({IP, Port}, Normalized, _) ->
97108
[{IP, Port}|Normalized];
98109
normalize_ip_entry(IP, Normalized, DefaultPort) ->
99110
[{IP, DefaultPort}|Normalized].
111+
112+
113+
%% ===
114+
%% Gen_server Functions
115+
%% ===
116+
117+
-spec init(term()) -> {ok, state()}.
118+
init(_Args) ->
119+
State = #basho_bench_config_state{config_keys = base_config()},
120+
{ok, State}.
121+
122+
-spec code_change(term(), state(), term()) -> {ok, state()}.
123+
code_change(_OldVsn, State, _Extra) ->
124+
{ok, State}.
125+
126+
-spec terminate(term(), state()) -> 'ok'.
127+
terminate(_Reason, _State) ->
128+
ok.
129+
130+
handle_call({load_files, FileNames}, _From, State) ->
131+
TermsList = get_keys_from_files(FileNames),
132+
ConfigKeys2 = load_termlist(TermsList, State#basho_bench_config_state.config_keys),
133+
State2 = State#basho_bench_config_state{config_keys = ConfigKeys2},
134+
{reply, ok, State2};
135+
handle_call({set, app_run_mode, Value}, _From, State) ->
136+
application:set_env(basho_bench_app, app_run_mode, Value),
137+
{reply, ok, State};
138+
handle_call({get, app_run_mode}, _From, State) ->
139+
case application:get_env(basho_bench_app, app_run_mode) of
140+
{ok, Value} ->
141+
{reply, {ok, Value}, State};
142+
undefined ->
143+
{reply, error, State}
144+
end;
145+
handle_call({set, Key, Value}, _From, State) ->
146+
NewKVs = orddict:store(Key, Value, State#basho_bench_config_state.config_keys),
147+
State2 = State#basho_bench_config_state{config_keys = NewKVs},
148+
{reply, ok, State2};
149+
handle_call({get, Key}, _From, State) ->
150+
Value = orddict:find(Key, State#basho_bench_config_state.config_keys),
151+
{reply, Value, State}.
152+
get_keys_from_files(Files) ->
153+
[ case file:consult(File) of
154+
{ok, Terms} ->
155+
Terms;
156+
{error, Reason} ->
157+
?FAIL_MSG("Failed to parse config file ~s: ~p\n", [File, Reason]),
158+
throw(invalid_config),
159+
notokay
160+
end || File <- Files ].
161+
162+
163+
load_termlist(TermList, ExistingConfig) ->
164+
NewKVs = lists:flatten(TermList),
165+
FoldFun = fun({Key, Value}, Accum) ->
166+
orddict:store(Key, Value, Accum)
167+
end,
168+
lists:foldl(FoldFun, ExistingConfig, NewKVs).
169+
170+
base_config() ->
171+
orddict:from_list([
172+
%% Run mode: How should basho_bench started as a separate node, or part of an
173+
%% other node. The default is standalone, other option is included.
174+
175+
%%
176+
%% Mode of load generation:
177+
%% max - Generate as many requests as possible per worker
178+
%% {rate, Rate} - Exp. distributed Mean reqs/sec
179+
%%
180+
{mode, {rate, 5}},
181+
182+
%%
183+
%% Default log level
184+
%%
185+
{log_level, debug},
186+
187+
%%
188+
%% Base test output directory
189+
%%
190+
{test_dir, "tests"},
191+
192+
%%
193+
%% Test duration (minutes)
194+
%%
195+
{duration, 5},
196+
197+
%%
198+
%% Number of concurrent workers
199+
%%
200+
{concurrent, 3},
201+
202+
%%
203+
%% Driver module for the current test
204+
%%
205+
{driver, basho_bench_driver_http_raw},
206+
207+
%%
208+
%% Operations (and associated mix). Note that
209+
%% the driver may not implement every operation.
210+
%%
211+
{operations, [{get, 4},
212+
{put, 4},
213+
{delete, 1}]},
214+
215+
%%
216+
%% Interval on which to report latencies and status (seconds)
217+
%%
218+
{report_interval, 10},
219+
220+
%%
221+
%% Key generators
222+
%%
223+
%% {uniform_int, N} - Choose a uniformly distributed integer between 0 and N
224+
%%
225+
{key_generator, {uniform_int, 100000}},
226+
227+
%%
228+
%% Value generators
229+
%%
230+
%% {fixed_bin, N} - Fixed size binary blob of N bytes
231+
%%
232+
{value_generator, {fixed_bin, 100}}
233+
]).
234+
-ifdef(TEST).
235+
load_files_test() ->
236+
%% Extracted from bitcask, and null test.
237+
KVs = [[{mode,max},
238+
{duration,1},
239+
{report_interval,1},
240+
{concurrent,8},
241+
{driver,basho_bench_driver_null},
242+
{key_generator,{partitioned_sequential_int,5000000}},
243+
{disable_sequential_int_progress_report,true},
244+
{value_generator,{fixed_bin,10248}},
245+
{operations,[{do_something,7},{an_error,1},{another_error,2}]}],
246+
[{mode,max},
247+
{duration,10},
248+
{concurrent,1},
249+
{driver,basho_bench_driver_bitcask},
250+
{key_generator,{int_to_bin_bigendian,{uniform_int,5000000}}},
251+
{value_generator,{fixed_bin,10000}},
252+
{operations,[{get,1},{put,1}]},
253+
{code_paths,["../../public/bitcask"]},
254+
{bitcask_dir,"/tmp/bitcask.bench"},
255+
{bitcask_flags,[o_sync]}]],
256+
KVOrdDict = [{bitcask_dir,"/tmp/bitcask.bench"},
257+
{bitcask_flags,[o_sync]},
258+
{code_paths,["../../public/bitcask"]},
259+
{concurrent,1},
260+
{disable_sequential_int_progress_report,true},
261+
{driver,basho_bench_driver_bitcask},
262+
{duration,10},
263+
{key_generator,{int_to_bin_bigendian,{uniform_int,5000000}}},
264+
{mode,max},
265+
{operations,[{get,1},{put,1}]},
266+
{report_interval,1},
267+
{value_generator,{fixed_bin,10000}}],
268+
?assertEqual(KVOrdDict, load_termlist(KVs, [])).
269+
-endif.

0 commit comments

Comments
 (0)