4141 report_interval ,
4242 errors_since_last_report = false ,
4343 summary_file ,
44- errors_file }).
44+ errors_file ,
45+ last_warn = {0 ,0 ,0 }}).
4546
47+ -define (WARN_INTERVAL , 1000 ). % Warn once a second
4648% % ====================================================================
4749% % API
4850% % ====================================================================
@@ -60,9 +62,14 @@ op_complete(Op, ok, ElapsedUs) ->
6062 op_complete (Op , {ok , 1 }, ElapsedUs );
6163op_complete (Op , {ok , Units }, ElapsedUs ) ->
6264 % % Update the histogram and units counter for the op in question
63- % folsom_metrics:notify({latencies, Op}, ElapsedUs),
64- % folsom_metrics:notify({units, Op}, {inc, Units}),
65- gen_server :cast ({global , ? MODULE }, {Op , {ok , Units }, ElapsedUs }),
65+ % io:format("Get distributed: ~p~n", [get_distributed()]),
66+ case get_distributed () of
67+ true ->
68+ gen_server :cast ({global , ? MODULE }, {Op , {ok , Units }, ElapsedUs });
69+ false ->
70+ folsom_metrics :notify ({latencies , Op }, ElapsedUs ),
71+ folsom_metrics :notify ({units , Op }, {inc , Units })
72+ end ,
6673 ok ;
6774op_complete (Op , Result , ElapsedUs ) ->
6875 gen_server :call ({global , ? MODULE }, {op , Op , Result , ElapsedUs }).
@@ -141,10 +148,21 @@ handle_call({op, Op, {error, Reason}, _ElapsedUs}, _From, State) ->
141148 increment_error_counter ({Op , Reason }),
142149 {reply , ok , State # state { errors_since_last_report = true }}.
143150
144- handle_cast ({Op , {ok , Units }, ElapsedUs }, State ) ->
151+ handle_cast ({Op , {ok , Units }, ElapsedUs }, State = # state {last_write_time = LWT , report_interval = RI }) ->
152+ TimeSinceLastReport = timer :now_diff (os :timestamp (), LWT ) / 1000 , % % To get the diff in seconds
153+ TimeSinceLastWarn = timer :now_diff (os :timestamp (), State # state .last_warn ) / 1000 ,
154+ if
155+ TimeSinceLastReport > (RI * 2 ) andalso TimeSinceLastWarn > ? WARN_INTERVAL ->
156+ ? WARN (" basho_bench_stats has not reported in ~.2f milliseconds" , [TimeSinceLastReport ]),
157+ {message_queue_len , QLen } = process_info (self (), message_queue_len ),
158+ ? WARN (" stats process mailbox size = ~w " , [QLen ]),
159+ NewState = State # state {last_warn = os :timestamp ()};
160+ true ->
161+ NewState = State
162+ end ,
145163 folsom_metrics :notify ({latencies , Op }, ElapsedUs ),
146164 folsom_metrics :notify ({units , Op }, {inc , Units }),
147- {noreply , State };
165+ {noreply , NewState };
148166handle_cast (_ , State ) ->
149167 {noreply , State }.
150168
@@ -173,6 +191,21 @@ code_change(_OldVsn, State, _Extra) ->
173191% % Internal functions
174192% % ====================================================================
175193
194+ % % Uses the process dictionary to memoize checks
195+ % % for checking if we're running in distributed mode
196+ % % as constantly checking in with a centralized gen_server
197+ % % would impede progress
198+
199+ get_distributed () ->
200+ case erlang :get (distribute_work ) of
201+ undefined ->
202+ DistributeWork = basho_bench_config :get (distribute_work , false ),
203+ erlang :put (distribute_work , DistributeWork ),
204+ DistributeWork ;
205+ DistributeWork ->
206+ DistributeWork
207+ end .
208+
176209op_csv_file ({Label , _Op }) ->
177210 Fname = normalize_label (Label ) ++ " _latencies.csv" ,
178211 {ok , F } = file :open (Fname , [raw , binary , write ]),
0 commit comments