Skip to content

Commit dbb94c7

Browse files
committed
Generic iterator module
1 parent a378976 commit dbb94c7

4 files changed

Lines changed: 239 additions & 76 deletions

File tree

src/erlfdb_iterator.erl

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
-module(erlfdb_iterator).
2+
3+
-define(DOCATTRS, ?OTP_RELEASE >= 27).
4+
5+
-if(?DOCATTRS).
6+
-moduledoc """
7+
A generic iterator behaviour, useful for streaming results from the database.
8+
""".
9+
-endif.
10+
11+
-type state() :: term().
12+
-type iterator() :: {module(), state()}.
13+
-type result() :: term().
14+
15+
-callback handle_call(term(), term(), state()) -> {reply, term(), state()}.
16+
-callback handle_next(state()) ->
17+
{cont, [result()], state()} | {halt, [result()], state()} | {halt, state()}.
18+
-callback handle_stop(state()) -> ok.
19+
20+
-optional_callbacks([handle_call/3]).
21+
22+
-export_type([iterator/0, result/0]).
23+
24+
-export([new/2, next/1, stop/1, run/1, call/2, pipeline/1, module_state/1]).
25+
26+
-if(?DOCATTRS).
27+
-doc """
28+
Creates an iterator.
29+
""".
30+
-endif.
31+
-spec new(module(), state()) -> iterator().
32+
new(Module, State) ->
33+
{Module, State}.
34+
35+
-if(?DOCATTRS).
36+
-doc """
37+
Executes a handle_call on the iterator.
38+
""".
39+
-endif.
40+
-spec call(iterator(), term()) -> {term(), iterator()}.
41+
call(Iterator, Call) ->
42+
{Module, State} = module_state(Iterator),
43+
{reply, Reply, State1} = Module:handle_call(Call, make_ref(), State),
44+
{Reply, new(Module, State1)}.
45+
46+
-if(?DOCATTRS).
47+
-doc """
48+
Progresses the iterator one step.
49+
""".
50+
-endif.
51+
-spec next(iterator()) ->
52+
{cont, [result()], iterator()} | {halt, [result()], iterator()} | {halt, iterator()}.
53+
next({Module, State}) ->
54+
case Module:handle_next(State) of
55+
{cont, Results, State1} ->
56+
{cont, Results, {Module, State1}};
57+
{halt, Results, State1} ->
58+
{halt, Results, {Module, State1}};
59+
{halt, State1} ->
60+
{halt, {Module, State1}}
61+
end.
62+
63+
-if(?DOCATTRS).
64+
-doc """
65+
Stops the iterator.
66+
""".
67+
-endif.
68+
-spec stop(iterator()) -> ok.
69+
stop({Module, State}) ->
70+
ok = Module:handle_stop(State).
71+
72+
-if(?DOCATTRS).
73+
-doc """
74+
Runs the iterator to completion.
75+
76+
Returns the list of results and the state of the iterator.
77+
78+
Caller must call `stop/1` to terminate the iterator.
79+
""".
80+
-endif.
81+
-spec run(iterator()) -> {list(result()), iterator()}.
82+
run(Iterator) ->
83+
[{Result, Iterator1}] = pipeline([Iterator]),
84+
{Result, Iterator1}.
85+
86+
-if(?DOCATTRS).
87+
-doc """
88+
Runs all iterators to completion.
89+
90+
Caller must call `stop/1` to terminate the iterators.
91+
""".
92+
-endif.
93+
-spec pipeline([iterator()]) -> [{list(result()), iterator()}].
94+
pipeline(List) ->
95+
Len = length(List),
96+
Acc = erlang:make_tuple(Len, []),
97+
IxList = lists:zip(lists:seq(1, Len), List),
98+
pipeline(IxList, Acc).
99+
100+
-if(?DOCATTRS).
101+
-doc """
102+
Identifies the module and the specific state of the iterator implementation.
103+
""".
104+
-endif.
105+
-spec module_state(iterator()) -> {module(), state()}.
106+
module_state({Module, State}) -> {Module, State}.
107+
108+
pipeline([], Acc) ->
109+
tuple_to_list(Acc);
110+
pipeline(IxList, Acc) ->
111+
{Remaining, Acc1} = lists:foldl(
112+
fun({Ix, Iterator}, {Rem0, Acc0}) ->
113+
case next(Iterator) of
114+
{halt, Iterator1} ->
115+
ok = stop(Iterator1),
116+
AccResults = lists:append(lists:reverse(erlang:element(Ix, Acc0))),
117+
{Rem0, erlang:setelement(Ix, Acc0, {AccResults, Iterator1})};
118+
{halt, [], Iterator1} ->
119+
ok = stop(Iterator1),
120+
AccResults = lists:append(lists:reverse(erlang:element(Ix, Acc0))),
121+
{Rem0, erlang:setelement(Ix, Acc0, {AccResults, Iterator1})};
122+
{halt, Results, Iterator1} ->
123+
ok = stop(Iterator1),
124+
AccResults = lists:append(lists:reverse([Results | erlang:element(Ix, Acc0)])),
125+
{Rem0, erlang:setelement(Ix, Acc0, {AccResults, Iterator1})};
126+
{cont, [], Iterator1} ->
127+
{[{Ix, Iterator1} | Rem0], Acc0};
128+
{cont, Results, Iterator1} ->
129+
AccResults = erlang:element(Ix, Acc0),
130+
{[{Ix, Iterator1} | Rem0], erlang:setelement(Ix, Acc0, [Results | AccResults])}
131+
end
132+
end,
133+
{[], Acc},
134+
IxList
135+
),
136+
pipeline(lists:reverse(Remaining), Acc1).

src/erlfdb_range_iterator.erl

Lines changed: 42 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
-module(erlfdb_range_iterator).
2+
-behaviour(erlfdb_iterator).
23

34
-define(DOCATTRS, ?OTP_RELEASE >= 27).
45

@@ -17,13 +18,14 @@ To use the iterator,
1718

1819
1. `erlfdb_range_iterator:start/4`: Sends the first GetRange request to the
1920
database server.
20-
2. `erlfdb_range_iterator.next/1`: Waits for the result of the GetRange request.
21+
2. `erlfdb_iterator.next/1`: Waits for the result of the GetRange request.
2122
Then issues another GetRange request to the database server.
22-
3. `erlfdb_range_iterator.stop/1`: Cancels the active future, if one exists.
23+
3. `erlfdb_iterator.stop/1`: Cancels the active future, if one exists.
2324
""".
2425
-endif.
2526

26-
-export([start/4, next/1, stop/1, get_future/1]).
27+
-export([start/3, start/4, get_future/1]).
28+
-export([handle_next/1, handle_stop/1]).
2729

2830
-record(state, {
2931
tx,
@@ -39,50 +41,50 @@ To use the iterator,
3941
future
4042
}).
4143

42-
-type state() :: term().
43-
-export_type([state/0]).
44+
-type state() :: #state{}.
45+
-type page() :: list(erlfdb:kv()) | list(erlfdb:mapped_kv()).
46+
47+
-export_type([page/0]).
4448

4549
-if(?DOCATTRS).
4650
-doc """
4751
Starts the iterator.
4852

49-
Sends the first GetRange request to the database server.
53+
Equivalent to `start(Tx, StartKey, EndKey, [])`.
5054
""".
5155
-endif.
52-
-spec start(erlfdb:transaction(), erlfdb:key(), erlfdb:key(), [erlfdb:fold_option()]) ->
53-
{ok, state()}.
54-
start(Tx, StartKey, EndKey, Options) ->
55-
State = new_state(Tx, StartKey, EndKey, Options),
56-
State1 = send_get_range(State),
57-
{ok, State1}.
56+
-spec start(erlfdb:transaction(), erlfdb:key(), erlfdb:key()) ->
57+
erlfdb_iterator:iterator().
58+
start(Tx, StartKey, EndKey) ->
59+
start(Tx, StartKey, EndKey, []).
5860

5961
-if(?DOCATTRS).
6062
-doc """
61-
Progresses the iterator, returning data that is retrieved.
63+
Starts the iterator.
6264

63-
Waits for the active future, sends the next GetRange request, and returns the data.
65+
Sends the first GetRange request to the database server.
6466
""".
6567
-endif.
66-
-spec next(state()) ->
67-
{done, state()} | {ok, list(erlfdb:kv()) | list(erlfdb:mapped_kv()), state()}.
68-
next(State = #state{future = undefined}) ->
69-
{done, State};
70-
next(State = #state{}) ->
68+
-spec start(erlfdb:transaction(), erlfdb:key(), erlfdb:key(), [erlfdb:fold_option()]) ->
69+
erlfdb_iterator:iterator().
70+
start(Tx, StartKey, EndKey, Options) ->
71+
State = new_state(Tx, StartKey, EndKey, Options),
72+
State1 = send_get_range(State),
73+
erlfdb_iterator:new(?MODULE, State1).
74+
75+
-spec handle_next(state()) ->
76+
{halt, state()} | {cont, [page()], state()}.
77+
handle_next(State = #state{future = undefined}) ->
78+
{halt, State};
79+
handle_next(State = #state{}) ->
7180
#state{future = Future} = State,
7281
Result = erlfdb:wait(Future, []),
7382
handle_result(Result, State).
7483

75-
-if(?DOCATTRS).
76-
-doc """
77-
Stops the iterator.
78-
79-
If there is an active future, it is cancelled.
80-
""".
81-
-endif.
82-
-spec stop(state()) -> ok.
83-
stop(_State = #state{future = undefined}) ->
84+
-spec handle_stop(state()) -> ok.
85+
handle_stop(_State = #state{future = undefined}) ->
8486
ok;
85-
stop(_State = #state{future = Future}) ->
87+
handle_stop(_State = #state{future = Future}) ->
8688
erlfdb:cancel(Future),
8789
ok.
8890

@@ -118,7 +120,17 @@ handle_result({RawRows, Count, HasMore}, State = #state{}) ->
118120
true ->
119121
State1
120122
end,
121-
{ok, Rows, State2}.
123+
124+
case {Rows, Recurse} of
125+
{[], false} ->
126+
{halt, State2};
127+
{[], true} ->
128+
{cont, [], State2};
129+
{_, false} ->
130+
{halt, [Rows], State2};
131+
{_, true} ->
132+
{cont, [Rows], State2}
133+
end.
122134

123135
new_state(Tx, StartKey, EndKey, Options) ->
124136
Reverse =

src/jms.erl

Lines changed: 0 additions & 14 deletions
This file was deleted.

0 commit comments

Comments
 (0)