Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## v0.3.4 (TBD)

### Enhancements

* (#65) Add an iterator interface for Range requests.

## v0.3.3 (2025-12-21)

### Enhancements
Expand Down
2 changes: 1 addition & 1 deletion src/erlfdb.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

{application, erlfdb, [
{description, "Erlang client for FoundationDB"},
{vsn, "0.3.3"},
{vsn, "0.3.4"},
{registered, []},
{applications, [kernel, stdlib]},
{maintainers, ["Jesse Stimpson"]},
Expand Down
136 changes: 136 additions & 0 deletions src/erlfdb_iterator.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
-module(erlfdb_iterator).

-define(DOCATTRS, ?OTP_RELEASE >= 27).

-if(?DOCATTRS).
-moduledoc """
A generic iterator behaviour, useful for streaming results from the database.
""".
-endif.

-type state() :: term().
-type iterator() :: {module(), state()}.
-type result() :: term().

-callback handle_call(term(), term(), state()) -> {reply, term(), state()}.
-callback handle_next(state()) ->
{cont, [result()], state()} | {halt, [result()], state()} | {halt, state()}.
-callback handle_stop(state()) -> ok.

-optional_callbacks([handle_call/3]).

-export_type([iterator/0, result/0]).

-export([new/2, next/1, stop/1, run/1, call/2, pipeline/1, module_state/1]).

-if(?DOCATTRS).
-doc """
Creates an iterator.
""".
-endif.
-spec new(module(), state()) -> iterator().
new(Module, State) ->
{Module, State}.

-if(?DOCATTRS).
-doc """
Executes a handle_call on the iterator.
""".
-endif.
-spec call(iterator(), term()) -> {term(), iterator()}.
call(Iterator, Call) ->
{Module, State} = module_state(Iterator),
{reply, Reply, State1} = Module:handle_call(Call, make_ref(), State),
{Reply, new(Module, State1)}.

-if(?DOCATTRS).
-doc """
Progresses the iterator one step.
""".
-endif.
-spec next(iterator()) ->
{cont, [result()], iterator()} | {halt, [result()], iterator()} | {halt, iterator()}.
next({Module, State}) ->
case Module:handle_next(State) of
{cont, Results, State1} ->
{cont, Results, {Module, State1}};
{halt, Results, State1} ->
{halt, Results, {Module, State1}};
{halt, State1} ->
{halt, {Module, State1}}
end.

-if(?DOCATTRS).
-doc """
Stops the iterator.
""".
-endif.
-spec stop(iterator()) -> ok.
stop({Module, State}) ->
ok = Module:handle_stop(State).

-if(?DOCATTRS).
-doc """
Runs the iterator to completion.

Returns the list of results and the state of the iterator.

Caller must call `stop/1` to terminate the iterator.
""".
-endif.
-spec run(iterator()) -> {list(result()), iterator()}.
run(Iterator) ->
[{Result, Iterator1}] = pipeline([Iterator]),
{Result, Iterator1}.

-if(?DOCATTRS).
-doc """
Runs all iterators to completion.

Caller must call `stop/1` to terminate the iterators.
""".
-endif.
-spec pipeline([iterator()]) -> [{list(result()), iterator()}].
pipeline(List) ->
Len = length(List),
Acc = erlang:make_tuple(Len, []),
IxList = lists:zip(lists:seq(1, Len), List),
pipeline(IxList, Acc).

-if(?DOCATTRS).
-doc """
Identifies the module and the specific state of the iterator implementation.
""".
-endif.
-spec module_state(iterator()) -> {module(), state()}.
module_state({Module, State}) -> {Module, State}.

pipeline([], Acc) ->
tuple_to_list(Acc);
pipeline(IxList, Acc) ->
{Remaining, Acc1} = lists:foldl(
fun({Ix, Iterator}, {Rem0, Acc0}) ->
case next(Iterator) of
{halt, Iterator1} ->
ok = stop(Iterator1),
AccResults = lists:append(lists:reverse(erlang:element(Ix, Acc0))),
{Rem0, erlang:setelement(Ix, Acc0, {AccResults, Iterator1})};
{halt, [], Iterator1} ->
ok = stop(Iterator1),
AccResults = lists:append(lists:reverse(erlang:element(Ix, Acc0))),
{Rem0, erlang:setelement(Ix, Acc0, {AccResults, Iterator1})};
{halt, Results, Iterator1} ->
ok = stop(Iterator1),
AccResults = lists:append(lists:reverse([Results | erlang:element(Ix, Acc0)])),
{Rem0, erlang:setelement(Ix, Acc0, {AccResults, Iterator1})};
{cont, [], Iterator1} ->
{[{Ix, Iterator1} | Rem0], Acc0};
{cont, Results, Iterator1} ->
AccResults = erlang:element(Ix, Acc0),
{[{Ix, Iterator1} | Rem0], erlang:setelement(Ix, Acc0, [Results | AccResults])}
end
end,
{[], Acc},
IxList
),
pipeline(lists:reverse(Remaining), Acc1).
239 changes: 239 additions & 0 deletions src/erlfdb_range_iterator.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
-module(erlfdb_range_iterator).
-behaviour(erlfdb_iterator).

-define(DOCATTRS, ?OTP_RELEASE >= 27).

-if(?DOCATTRS).
-moduledoc """
An iterator interface over GetRange and GetMappedRange.

The iterator is an alternative implmentation to `erlfdb:get_range/4` and
`erlfdb:wait_for_all_interleaving/3`. The database mechanics are equivalent,
but the iterator allows you to control when each wait actually happens.

Remember that FoundationDB transaction execution time is limited to 5 seconds,
so you must take care not to delay your iteration.

To use the iterator,

1. `erlfdb_range_iterator:start/4`: Sends the first GetRange request to the
database server.
2. `erlfdb_iterator.next/1`: Waits for the result of the GetRange request.
Then issues another GetRange request to the database server.
3. `erlfdb_iterator.stop/1`: Cancels the active future, if one exists.
""".
-endif.

-export([start/3, start/4, get_future/1]).
-export([handle_next/1, handle_stop/1]).

-record(state, {
tx,
start_key,
end_key,
mapper,
limit,
target_bytes,
streaming_mode,
iteration,
snapshot,
reverse,
future
}).

-type state() :: #state{}.
-type page() :: list(erlfdb:kv()) | list(erlfdb:mapped_kv()).

-export_type([page/0]).

-if(?DOCATTRS).
-doc """
Starts the iterator.

Equivalent to `start(Tx, StartKey, EndKey, [])`.
""".
-endif.
-spec start(erlfdb:transaction(), erlfdb:key(), erlfdb:key()) ->
erlfdb_iterator:iterator().
start(Tx, StartKey, EndKey) ->
start(Tx, StartKey, EndKey, []).

-if(?DOCATTRS).
-doc """
Starts the iterator.

Sends the first GetRange request to the database server.
""".
-endif.
-spec start(erlfdb:transaction(), erlfdb:key(), erlfdb:key(), [erlfdb:fold_option()]) ->
erlfdb_iterator:iterator().
start(Tx, StartKey, EndKey, Options) ->
State = new_state(Tx, StartKey, EndKey, Options),
State1 = send_get_range(State),
erlfdb_iterator:new(?MODULE, State1).

-spec handle_next(state()) ->
{halt, state()} | {cont, [page()], state()}.
handle_next(State = #state{future = undefined}) ->
{halt, State};
handle_next(State = #state{}) ->
#state{future = Future} = State,
Result = erlfdb:wait(Future, []),
handle_result(Result, State).

-spec handle_stop(state()) -> ok.
handle_stop(_State = #state{future = undefined}) ->
ok;
handle_stop(_State = #state{future = Future}) ->
erlfdb:cancel(Future),
ok.

-if(?DOCATTRS).
-doc """
Gets the active future from the iterator's internal state.

You needn't call this function for normal use of the iterator.
""".
-endif.
-spec get_future(state()) -> undefined | erlfdb:future().
get_future(#state{future = Future}) -> Future.

handle_result({RawRows, Count, HasMore}, State = #state{}) ->
#state{limit = Limit} = State,
% If our limit is within the current set of
% rows we need to truncate the list
Rows =
if
Limit == 0 orelse Limit > Count -> RawRows;
true -> lists:sublist(RawRows, Limit)
end,

% Determine if we have more rows to iterate
Recurse = (Rows /= []) and (Limit == 0 orelse Limit > Count) and HasMore,

State1 = State#state{future = undefined},
State2 =
if
Recurse ->
LastKey = get_last_key(Rows, State1),
send_get_range(next_state(LastKey, Count, State1));
true ->
State1
end,

case {Rows, Recurse} of
{[], false} ->
{halt, State2};
{[], true} ->
{cont, [], State2};
{_, false} ->
{halt, [Rows], State2};
{_, true} ->
{cont, [Rows], State2}
end.

new_state(Tx, StartKey, EndKey, Options) ->
Reverse =
case erlfdb_util:get(Options, reverse, false) of
true -> 1;
false -> 0;
I when is_integer(I) -> I
end,
Mapper = erlfdb_util:get(Options, mapper),
ok = assert_mapper(Mapper),
#state{
tx = Tx,
start_key = erlfdb_key:to_selector(StartKey),
end_key = erlfdb_key:to_selector(EndKey),
mapper = Mapper,
limit = erlfdb_util:get(Options, limit, 0),
target_bytes = erlfdb_util:get(Options, target_bytes, 0),
streaming_mode = erlfdb_util:get(Options, streaming_mode, want_all),
iteration = erlfdb_util:get(Options, iteration, 1),
snapshot = erlfdb_util:get(Options, snapshot, false),
reverse = Reverse
}.

assert_mapper(undefined) -> ok;
assert_mapper(Mapper) when is_binary(Mapper) -> ok;
assert_mapper(Mapper) -> erlang:error({badarg, mapper, Mapper}).

next_state(LastKey, Count, State = #state{}) ->
#state{
start_key = StartKey,
end_key = EndKey,
limit = Limit,
reverse = Reverse,
iteration = Iteration
} = State,
{NextStartKey, NextEndKey} =
case Reverse /= 0 of
true ->
{StartKey, erlfdb_key:first_greater_or_equal(LastKey)};
false ->
{erlfdb_key:first_greater_than(LastKey), EndKey}
end,
State#state{
start_key = NextStartKey,
end_key = NextEndKey,
limit = max(0, Limit - Count),
iteration = Iteration + 1
}.

send_get_range(State = #state{mapper = undefined}) ->
#state{
tx = Tx,
start_key = StartKey,
end_key = EndKey,
limit = Limit,
target_bytes = TargetBytes,
streaming_mode = StreamingMode,
iteration = Iteration,
snapshot = Snapshot,
reverse = Reverse
} = State,
Future = erlfdb_nif:transaction_get_range(
Tx,
StartKey,
EndKey,
Limit,
TargetBytes,
StreamingMode,
Iteration,
Snapshot,
Reverse
),
State#state{future = Future};
send_get_range(State = #state{mapper = Mapper}) ->
#state{
tx = Tx,
start_key = StartKey,
end_key = EndKey,
limit = Limit,
target_bytes = TargetBytes,
streaming_mode = StreamingMode,
iteration = Iteration,
snapshot = Snapshot,
reverse = Reverse
} = State,
Future = erlfdb_nif:transaction_get_mapped_range(
Tx,
StartKey,
EndKey,
Mapper,
Limit,
TargetBytes,
StreamingMode,
Iteration,
Snapshot,
Reverse
),
State#state{future = Future}.

get_last_key(Rows, _State = #state{mapper = undefined}) ->
{K, _V} = lists:last(Rows),
K;
get_last_key(Rows, _State = #state{}) ->
{KV, _, _} = lists:last(Rows),
{K, _} = KV,
K.
Loading