diff --git a/src/turtle_publisher.erl b/src/turtle_publisher.erl index 50aeb7f..1e71c45 100644 --- a/src/turtle_publisher.erl +++ b/src/turtle_publisher.erl @@ -331,7 +331,7 @@ handle_deliver(Tag, #amqp_msg { payload = Payload, props = Props }, ok = amqp_channel:cast(Ch, #'basic.ack' { delivery_tag = Tag }), case track_lookup(CorrID, IF) of {ok, Pid, T, IF2} -> - T2 = turtle_time:monotonic_time(), + T2 = erlang:monotonic_time(), Pid ! {rpc_reply, {self(), CorrID}, T2 - T, Type, Payload}, {noreply, State#state { in_flight = IF2 }}; not_found -> @@ -375,7 +375,7 @@ publish({cast, undefined}, Pub, AMQPMsg, #state{ channel = Ch, confirms = true } publish({call, From}, Pub, AMQPMsg, #state{ channel = Ch, confirms = true, unacked = UA } = State) -> Seq = amqp_channel:next_publish_seqno(Ch), ok = amqp_channel:call(Ch, Pub, AMQPMsg), - T = turtle_time:monotonic_time(), + T = erlang:monotonic_time(), {noreply, State#state{ unacked = gb_trees:insert(Seq, {From, T}, UA) }}; publish({rpc_call, From}, Pub, AMQPMsg, #state { @@ -391,7 +391,7 @@ publish({rpc_call, From}, Pub, AMQPMsg, reply_to = ReplyQ, correlation_id = <> }}, ok = amqp_channel:call(Ch, Pub, WithReply), - T = turtle_time:monotonic_time(), + T = erlang:monotonic_time(), {noreply, State#state { corr_id = CorrID + 1, @@ -404,7 +404,7 @@ publish({rpc_call, From}, Pub, AMQPMsg, reply_to = ReplyQ, correlation_id = <> }}, ok = amqp_channel:call(Ch, Pub, WithReply), - T = turtle_time:monotonic_time(), + T = erlang:monotonic_time(), Opaque = {self(), CorrID}, {reply, {ok, Opaque}, State#state { @@ -415,7 +415,7 @@ publish({F, _X}, Pub, AMQPMsg, #state{ channel = Ch, confirms = false } = State) {reply, ok, State}. confirm(Reply, Seq, Multiple, #state { unacked = UA } = State) -> - T2 = turtle_time:monotonic_time(), + T2 = erlang:monotonic_time(), {Results, UA1} = remove_delivery_tags(Seq, Multiple, UA), reply_to_callers(T2, Reply, Results), {ok, State#state { unacked = UA1 }}. @@ -444,11 +444,11 @@ remove_delivery_tags(Seq, true, Unacked) -> reply_to_callers(_T2, _Reply, []) -> ok; reply_to_callers(T2, Reply, [{From, T1} | Callers]) -> - Window = turtle_time:convert_time_unit(T2 - T1, native, milli_seconds), + Window = erlang:convert_time_unit(T2 - T1, native, milli_seconds), gen_server:reply(From, {Reply, Window}), reply_to_callers(T2, Reply, Callers); reply_to_callers(T2, ack, [{rpc, From, T1, CorrID} | Callers]) -> - Window = turtle_time:convert_time_unit(T2 - T1, native, milli_seconds), + Window = erlang:convert_time_unit(T2 - T1, native, milli_seconds), Opaque = {self(), CorrID}, gen_server:reply(From, {ok, Opaque, Window}), reply_to_callers(T2, ack, Callers); diff --git a/src/turtle_subscriber.erl b/src/turtle_subscriber.erl index 6e8f01e..30c71a1 100644 --- a/src/turtle_subscriber.erl +++ b/src/turtle_subscriber.erl @@ -106,7 +106,7 @@ handle_info(Info, #state { handle_info = undefined } = State) -> lager:warning("Unknown info message: ~p", [Info]), {noreply, State}; handle_info(Info, #state { handle_info = HandleInfo, invoke_state = IState } = State) -> - S = turtle_time:monotonic_time(), + S = erlang:monotonic_time(), try HandleInfo(Info, IState) of {ok, IState2} -> {noreply, State#state { invoke_state = IState2 }}; {Cmds, IState2} when is_list(Cmds) -> @@ -154,14 +154,14 @@ handle_deliver_bulk({#'basic.deliver' {delivery_tag = DTag, routing_key = Key}, channel = Channel, conn_name = CN, name = N } = State) -> - S = turtle_time:monotonic_time(), + S = erlang:monotonic_time(), Tag = {DTag, ReplyTo, CorrID}, try handle_message(Tag, Fun, Key, Content, IState) of {[], S2} -> - E = turtle_time:monotonic_time(), + E = erlang:monotonic_time(), exometer:update([CN, N, msgs], 1), exometer:update([CN, N, latency], - turtle_time:convert_time_unit(E-S, native, milli_seconds)), + erlang:convert_time_unit(E-S, native, milli_seconds)), {noreply, State#state { invoke_state = S2 }}; {Cmds, S2} when is_list(Cmds) -> handle_commands(S, Cmds, State#state { invoke_state = S2 }) @@ -180,7 +180,7 @@ handle_deliver_single({#'basic.deliver' {delivery_tag = DTag, routing_key = Key} correlation_id = CorrID, reply_to = ReplyTo }} = Content}, #state { invoke = Fun, invoke_state = IState,channel = Channel } = State) -> - S = turtle_time:monotonic_time(), + S = erlang:monotonic_time(), Tag = {DTag, ReplyTo, CorrID}, try %% Transform a single message into the style of bulk messages @@ -210,24 +210,24 @@ handle_commands(S, [C | Next], #state { channel = Channel, conn_name = CN, name = N } = State) -> case C of {ack, Tag} -> - E = turtle_time:monotonic_time(), + E = erlang:monotonic_time(), exometer:update([CN, N, msgs], 1), exometer:update([CN, N, latency], - turtle_time:convert_time_unit(E-S, native, milli_seconds)), + erlang:convert_time_unit(E-S, native, milli_seconds)), ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag) }), handle_commands(S, Next, State); {bulk_ack, Tag} -> - E = turtle_time:monotonic_time(), + E = erlang:monotonic_time(), exometer:update([CN, N, msgs], 1), exometer:update([CN, N, latency], - turtle_time:convert_time_unit(E-S, native, milli_seconds)), + erlang:convert_time_unit(E-S, native, milli_seconds)), ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag), multiple = true }), handle_commands(S, Next, State); {bulk_nack, Tag} -> - E = turtle_time:monotonic_time(), + E = erlang:monotonic_time(), exometer:update([CN, N, msgs], 1), exometer:update([CN, N, latency], - turtle_time:convert_time_unit(E-S, native, milli_seconds)), + erlang:convert_time_unit(E-S, native, milli_seconds)), ok = amqp_channel:cast(Channel, #'basic.nack' { delivery_tag = delivery_tag(Tag), multiple = true }), handle_commands(S, Next, State); {reject, Tag} -> @@ -241,10 +241,10 @@ handle_commands(S, [C | Next], #'basic.reject' { delivery_tag = delivery_tag(Tag), requeue = false}), handle_commands(S, Next, State); {reply, Tag, CType, Msg} -> - E = turtle_time:monotonic_time(), + E = erlang:monotonic_time(), exometer:update([CN, N, msgs], 1), exometer:update([CN, N, latency], - turtle_time:convert_time_unit(E-S, native, milli_seconds)), + erlang:convert_time_unit(E-S, native, milli_seconds)), reply(Channel, Tag, CType, Msg), ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag) }), handle_commands(S, Next, State); @@ -338,7 +338,7 @@ mode(#{}) -> single. delivery_tag({Tag, _ReplyTo, _CorrID}) -> Tag. shutdown(Reason, #state { handle_info = HandleInfo, invoke_state = IState } = State) -> - S = turtle_time:monotonic_time(), + S = erlang:monotonic_time(), try HandleInfo({amqp_shutdown, Reason}, IState) of {ok, IState2} -> {noreply, State#state { invoke_state = IState2 }}; {Cmds, IState2} when is_list(Cmds) -> diff --git a/src/turtle_time.erl b/src/turtle_time.erl deleted file mode 100644 index 843dcdd..0000000 --- a/src/turtle_time.erl +++ /dev/null @@ -1,272 +0,0 @@ -%%% @private --module(turtle_time). -%% We don't want warnings about the use of erlang:now/0 in -%% this module. --compile(nowarn_deprecated_function). -%% -%% We don't use -%% -compile({nowarn_deprecated_function, [{erlang, now, 0}]}). -%% since this will produce warnings when compiled on systems -%% where it has not yet been deprecated. -%% - --export([monotonic_time/0, - monotonic_time/1, - erlang_system_time/0, - erlang_system_time/1, - os_system_time/0, - os_system_time/1, - time_offset/0, - time_offset/1, - convert_time_unit/3, - timestamp/0, - unique_integer/0, - unique_integer/1, - monitor/2, - system_info/1, - system_flag/2]). - -monotonic_time() -> - try - erlang:monotonic_time() - catch - error:undef -> - %% Use Erlang system time as monotonic time - erlang_system_time_fallback() - end. - -monotonic_time(Unit) -> - try - erlang:monotonic_time(Unit) - catch - error:badarg -> - erlang:error(badarg, [Unit]); - error:undef -> - %% Use Erlang system time as monotonic time - STime = erlang_system_time_fallback(), - try - convert_time_unit_fallback(STime, native, Unit) - catch - error:bad_time_unit -> erlang:error(badarg, [Unit]) - end - end. - -erlang_system_time() -> - try - erlang:system_time() - catch - error:undef -> - erlang_system_time_fallback() - end. - -erlang_system_time(Unit) -> - try - erlang:system_time(Unit) - catch - error:badarg -> - erlang:error(badarg, [Unit]); - error:undef -> - STime = erlang_system_time_fallback(), - try - convert_time_unit_fallback(STime, native, Unit) - catch - error:bad_time_unit -> erlang:error(badarg, [Unit]) - end - end. - -os_system_time() -> - try - os:system_time() - catch - error:undef -> - os_system_time_fallback() - end. - -os_system_time(Unit) -> - try - os:system_time(Unit) - catch - error:badarg -> - erlang:error(badarg, [Unit]); - error:undef -> - STime = os_system_time_fallback(), - try - convert_time_unit_fallback(STime, native, Unit) - catch - error:bad_time_unit -> erlang:error(badarg, [Unit]) - end - end. - -time_offset() -> - try - erlang:time_offset() - catch - error:undef -> - %% Erlang system time and Erlang monotonic - %% time are always aligned - 0 - end. - -time_offset(Unit) -> - try - erlang:time_offset(Unit) - catch - error:badarg -> - erlang:error(badarg, [Unit]); - error:undef -> - try - _ = integer_time_unit(Unit) - catch - error:bad_time_unit -> erlang:error(badarg, [Unit]) - end, - %% Erlang system time and Erlang monotonic - %% time are always aligned - 0 - end. - -convert_time_unit(Time, FromUnit, ToUnit) -> - try - erlang:convert_time_unit(Time, FromUnit, ToUnit) - catch - error:undef -> - try - convert_time_unit_fallback(Time, FromUnit, ToUnit) - catch - _:_ -> - erlang:error(badarg, [Time, FromUnit, ToUnit]) - end; - error:Error -> - erlang:error(Error, [Time, FromUnit, ToUnit]) - end. - -timestamp() -> - try - erlang:timestamp() - catch - error:undef -> - erlang:now() - end. - -unique_integer() -> - try - erlang:unique_integer() - catch - error:undef -> - {MS, S, US} = erlang:now(), - (MS*1000000+S)*1000000+US - end. - -unique_integer(Modifiers) -> - try - erlang:unique_integer(Modifiers) - catch - error:badarg -> - erlang:error(badarg, [Modifiers]); - error:undef -> - case is_valid_modifier_list(Modifiers) of - true -> - %% now() converted to an integer - %% fullfill the requirements of - %% all modifiers: unique, positive, - %% and monotonic... - {MS, S, US} = erlang:now(), - (MS*1000000+S)*1000000+US; - false -> - erlang:error(badarg, [Modifiers]) - end - end. - -monitor(Type, Item) -> - try - erlang:monitor(Type, Item) - catch - error:Error -> - case {Error, Type, Item} of - {badarg, time_offset, clock_service} -> - %% Time offset is final and will never change. - %% Return a dummy reference, there will never - %% be any need for 'CHANGE' messages... - make_ref(); - _ -> - erlang:error(Error, [Type, Item]) - end - end. - -system_info(Item) -> - try - erlang:system_info(Item) - catch - error:badarg -> - case Item of - time_correction -> - case erlang:system_info(tolerant_timeofday) of - enabled -> true; - disabled -> false - end; - time_warp_mode -> - no_time_warp; - time_offset -> - final; - NotSupArg when NotSupArg == os_monotonic_time_source; - NotSupArg == os_system_time_source; - NotSupArg == start_time; - NotSupArg == end_time -> - %% Cannot emulate this... - erlang:error(notsup, [NotSupArg]); - _ -> - erlang:error(badarg, [Item]) - end; - error:Error -> - erlang:error(Error, [Item]) - end. - -system_flag(Flag, Value) -> - try - erlang:system_flag(Flag, Value) - catch - error:Error -> - case {Error, Flag, Value} of - {badarg, time_offset, finalize} -> - %% Time offset is final - final; - _ -> - erlang:error(Error, [Flag, Value]) - end - end. - -%% -%% Internal functions -%% - -integer_time_unit(native) -> 1000*1000; -integer_time_unit(nano_seconds) -> 1000*1000*1000; -integer_time_unit(micro_seconds) -> 1000*1000; -integer_time_unit(milli_seconds) -> 1000; -integer_time_unit(seconds) -> 1; -integer_time_unit(I) when is_integer(I), I > 0 -> I; -integer_time_unit(BadRes) -> erlang:error(bad_time_unit, [BadRes]). - -erlang_system_time_fallback() -> - {MS, S, US} = erlang:now(), - (MS*1000000+S)*1000000+US. - -os_system_time_fallback() -> - {MS, S, US} = os:timestamp(), - (MS*1000000+S)*1000000+US. - -convert_time_unit_fallback(Time, FromUnit, ToUnit) -> - FU = integer_time_unit(FromUnit), - TU = integer_time_unit(ToUnit), - case Time < 0 of - true -> TU*Time - (FU - 1); - false -> TU*Time - end div FU. - -is_valid_modifier_list([positive|Ms]) -> - is_valid_modifier_list(Ms); -is_valid_modifier_list([monotonic|Ms]) -> - is_valid_modifier_list(Ms); -is_valid_modifier_list([]) -> - true; -is_valid_modifier_list(_) -> - false.