3232
3333-record (state , {
3434 pb_pid ,
35+ repl_pid ,
3536 http_host ,
3637 http_port ,
3738 recordBucket ,
@@ -98,6 +99,7 @@ new(Id) ->
9899 PBPort = basho_bench_config :get (pb_port , 8087 ),
99100 HTTPIPs = basho_bench_config :get (http_ips , [" 127.0.0.1" ]),
100101 HTTPPort = basho_bench_config :get (http_port , 8098 ),
102+ ReplPBIPs = basho_bench_config :get (replpb_ips , [" 127.0.0.1" ]),
101103
102104 PBTimeout = basho_bench_config :get (pb_timeout_general , 30 * 1000 ),
103105 HTTPTimeout = basho_bench_config :get (http_timeout_general , 30 * 1000 ),
@@ -120,6 +122,12 @@ new(Id) ->
120122 ? INFO (" Using pb target ~p :~p for worker ~p \n " , [PBTargetIp ,
121123 PBTargetPort ,
122124 Id ]),
125+ ReplTargets = basho_bench_config :normalize_ips (ReplPBIPs , PBPort ),
126+ {ReplTargetIp ,
127+ ReplTargetPort } = lists :nth ((Id rem length (ReplTargets ) + 1 ),
128+ ReplTargets ),
129+ ? INFO (" Using repl target ~p :~p for worker ~p \n " ,
130+ [ReplTargetIp , ReplTargetPort , Id ]),
123131
124132 {AGMaxKC , AGMinKC , AGKeyOrder } =
125133 basho_bench_config :get (alwaysget , {1 , 1 , key_order }),
@@ -134,8 +142,17 @@ new(Id) ->
134142 case riakc_pb_socket :start_link (PBTargetIp , PBTargetPort ) of
135143 {ok , Pid } ->
136144 NominatedID = Id == 7 ,
145+ ReplPid =
146+ case riakc_pb_socket :start_link (ReplTargetIp , ReplTargetPort ) of
147+ {ok , RP } ->
148+ RP ;
149+ _ ->
150+ lager :info (" Starting with no repl check" ),
151+ no_repl_check
152+ end ,
137153 {ok , # state {
138154 pb_pid = Pid ,
155+ repl_pid = ReplPid ,
139156 http_host = HTTPTargetIp ,
140157 http_port = HTTPTargetPort ,
141158 recordBucket = <<" domainRecord" >>,
@@ -306,9 +323,16 @@ run(update_with2i, KeyGen, ValueGen, State) ->
306323 {error , Reason , State }
307324 end ;
308325% % Put an object with a unique key and a non-compressable value
309- run (put_unique , _KeyGen , _ValueGen , State ) ->
326+ run (put_unique_bet365 , _KeyGen , _ValueGen , State ) ->
310327 Pid = State # state .pb_pid ,
311- Bucket = State # state .documentBucket ,
328+
329+ Bucket =
330+ case erlang :phash2 (Pid ) rem 2 of
331+ 0 ->
332+ <<" abcdefghijklmnopqrstuvwxyz_1" >>;
333+ 1 ->
334+ <<" abcdefghijklmnopqrstuvwxyz_2" >>
335+ end ,
312336
313337 UKC = State # state .unique_key_count ,
314338 Key =
@@ -319,8 +343,8 @@ run(put_unique, _KeyGen, _ValueGen, State) ->
319343 Value = non_compressible_value (State # state .unique_size ),
320344
321345 Robj0 = riakc_obj :new (Bucket , to_binary (Key )),
322- MD1 = riakc_obj :get_update_metadata (Robj0 ),
323- MD2 = riakc_obj :set_secondary_index (MD1 , generate_binary_indexes ()),
346+ MD2 = riakc_obj :get_update_metadata (Robj0 ),
347+ % MD2 = riakc_obj:set_secondary_index(MD1, generate_binary_indexes()),
324348 Robj1 = riakc_obj :update_value (Robj0 , Value ),
325349 Robj2 = riakc_obj :update_metadata (Robj1 , MD2 ),
326350
@@ -331,6 +355,26 @@ run(put_unique, _KeyGen, _ValueGen, State) ->
331355 {error , Reason } ->
332356 {error , Reason , State }
333357 end ;
358+ % % Put an object with a unique key and a non-compressable value
359+ run (put_unique , _KeyGen , _ValueGen , State ) ->
360+ {Pid , _Bucket , _Key , Robj , UKC } = prepare_unique_put (State ),
361+ % % Write the object...
362+ case riakc_pb_socket :put (Pid , Robj , State # state .pb_timeout ) of
363+ ok ->
364+ {ok , State # state {unique_key_count = UKC + 1 }};
365+ {error , Reason } ->
366+ {error , Reason , State }
367+ end ;
368+ run (put_unique_checkrepl , _KeyGen , _ValueGen , State ) ->
369+ {Pid , Bucket , Key , Robj , UKC } = prepare_unique_put (State ),
370+ % % Write the object...
371+ case riakc_pb_socket :put (Pid , Robj , State # state .pb_timeout ) of
372+ ok ->
373+ check_repl (State # state .repl_pid , Bucket , to_binary (Key ), State # state .pb_timeout ),
374+ {ok , State # state {unique_key_count = UKC + 1 }};
375+ {error , Reason } ->
376+ {error , Reason , State }
377+ end ;
334378run (get_unique , _KeyGen , _ValueGen , State ) ->
335379 % Get one of the objects with unique keys
336380 Pid = State # state .pb_pid ,
@@ -514,6 +558,24 @@ run(Other, _, _, _) ->
514558% % ====================================================================
515559
516560
561+ prepare_unique_put (State ) ->
562+ Pid = State # state .pb_pid ,
563+ Bucket = State # state .documentBucket ,
564+
565+ UKC = State # state .unique_key_count ,
566+ Key =
567+ generate_uniquekey (UKC ,
568+ State # state .keyid ,
569+ State # state .unique_keyorder ),
570+
571+ Value = non_compressible_value (State # state .unique_size ),
572+
573+ Robj0 = riakc_obj :new (Bucket , to_binary (Key )),
574+ MD1 = riakc_obj :get_update_metadata (Robj0 ),
575+ MD2 = riakc_obj :set_secondary_index (MD1 , generate_binary_indexes ()),
576+ Robj1 = riakc_obj :update_value (Robj0 , Value ),
577+ Robj2 = riakc_obj :update_metadata (Robj1 , MD2 ),
578+ {Pid , Bucket , Key , Robj2 , UKC }.
517579
518580json_get (Url , Timeout ) ->
519581 json_get (Url , Timeout , true ).
@@ -561,6 +623,15 @@ ensure_module(Module) ->
561623 ok
562624 end .
563625
626+ check_repl (no_repl_check , _B , _K , _TO ) ->
627+ ok ;
628+ check_repl (ReplPid , Bucket , Key , Timeout ) ->
629+ case riakc_pb_socket :get (ReplPid , Bucket , Key , Timeout ) of
630+ {ok , _Obj } ->
631+ ok ;
632+ {error , _ } ->
633+ check_repl (ReplPid , Bucket , Key , Timeout )
634+ end .
564635
565636% % ====================================================================
566637% % Spawned Runners
@@ -727,4 +798,4 @@ convert_tolist(I) when is_integer(I) ->
727798 list_to_binary (lists :flatten (io_lib :format (" ~9..0B " , [I ])));
728799convert_tolist (Bin ) ->
729800 <<I :26 /integer , _Tail :6 /bitstring >> = Bin ,
730- convert_tolist (I ).
801+ convert_tolist (I ).
0 commit comments