From a378976ab072f0e5a4f6d29cdca6f1cf6e281367 Mon Sep 17 00:00:00 2001 From: Jesse Stimpson Date: Sun, 25 Jan 2026 12:27:10 -0500 Subject: [PATCH 1/2] Add an iterator interface for range requests --- CHANGELOG.md | 6 + src/erlfdb.app.src | 2 +- src/erlfdb_range_iterator.erl | 227 +++++++++++++++++++++++++ src/jms.erl | 14 ++ test/erlfdb_02_anon_fdbserver_test.erl | 93 ++++++++++ 5 files changed, 341 insertions(+), 1 deletion(-) create mode 100644 src/erlfdb_range_iterator.erl create mode 100644 src/jms.erl diff --git a/CHANGELOG.md b/CHANGELOG.md index ea2ca22..dd9e5aa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## v0.3.4 (TBD) + +### Enhancements + + * (#65) Add an iterator interface for Range requests. + ## v0.3.3 (2025-12-21) ### Enhancements diff --git a/src/erlfdb.app.src b/src/erlfdb.app.src index 02a9a58..1af6105 100644 --- a/src/erlfdb.app.src +++ b/src/erlfdb.app.src @@ -12,7 +12,7 @@ {application, erlfdb, [ {description, "Erlang client for FoundationDB"}, - {vsn, "0.3.3"}, + {vsn, "0.3.4"}, {registered, []}, {applications, [kernel, stdlib]}, {maintainers, ["Jesse Stimpson"]}, diff --git a/src/erlfdb_range_iterator.erl b/src/erlfdb_range_iterator.erl new file mode 100644 index 0000000..e17c94c --- /dev/null +++ b/src/erlfdb_range_iterator.erl @@ -0,0 +1,227 @@ +-module(erlfdb_range_iterator). + +-define(DOCATTRS, ?OTP_RELEASE >= 27). + +-if(?DOCATTRS). +-moduledoc """ +An iterator interface over GetRange and GetMappedRange. + +The iterator is an alternative implmentation to `erlfdb:get_range/4` and +`erlfdb:wait_for_all_interleaving/3`. The database mechanics are equivalent, +but the iterator allows you to control when each wait actually happens. + +Remember that FoundationDB transaction execution time is limited to 5 seconds, +so you must take care not to delay your iteration. + +To use the iterator, + +1. `erlfdb_range_iterator:start/4`: Sends the first GetRange request to the + database server. +2. `erlfdb_range_iterator.next/1`: Waits for the result of the GetRange request. + Then issues another GetRange request to the database server. +3. `erlfdb_range_iterator.stop/1`: Cancels the active future, if one exists. +""". +-endif. + +-export([start/4, next/1, stop/1, get_future/1]). + +-record(state, { + tx, + start_key, + end_key, + mapper, + limit, + target_bytes, + streaming_mode, + iteration, + snapshot, + reverse, + future +}). + +-type state() :: term(). +-export_type([state/0]). + +-if(?DOCATTRS). +-doc """ +Starts the iterator. + +Sends the first GetRange request to the database server. +""". +-endif. +-spec start(erlfdb:transaction(), erlfdb:key(), erlfdb:key(), [erlfdb:fold_option()]) -> + {ok, state()}. +start(Tx, StartKey, EndKey, Options) -> + State = new_state(Tx, StartKey, EndKey, Options), + State1 = send_get_range(State), + {ok, State1}. + +-if(?DOCATTRS). +-doc """ +Progresses the iterator, returning data that is retrieved. + +Waits for the active future, sends the next GetRange request, and returns the data. +""". +-endif. +-spec next(state()) -> + {done, state()} | {ok, list(erlfdb:kv()) | list(erlfdb:mapped_kv()), state()}. +next(State = #state{future = undefined}) -> + {done, State}; +next(State = #state{}) -> + #state{future = Future} = State, + Result = erlfdb:wait(Future, []), + handle_result(Result, State). + +-if(?DOCATTRS). +-doc """ +Stops the iterator. + +If there is an active future, it is cancelled. +""". +-endif. +-spec stop(state()) -> ok. +stop(_State = #state{future = undefined}) -> + ok; +stop(_State = #state{future = Future}) -> + erlfdb:cancel(Future), + ok. + +-if(?DOCATTRS). +-doc """ +Gets the active future from the iterator's internal state. + +You needn't call this function for normal use of the iterator. +""". +-endif. +-spec get_future(state()) -> undefined | erlfdb:future(). +get_future(#state{future = Future}) -> Future. + +handle_result({RawRows, Count, HasMore}, State = #state{}) -> + #state{limit = Limit} = State, + % If our limit is within the current set of + % rows we need to truncate the list + Rows = + if + Limit == 0 orelse Limit > Count -> RawRows; + true -> lists:sublist(RawRows, Limit) + end, + + % Determine if we have more rows to iterate + Recurse = (Rows /= []) and (Limit == 0 orelse Limit > Count) and HasMore, + + State1 = State#state{future = undefined}, + State2 = + if + Recurse -> + LastKey = get_last_key(Rows, State1), + send_get_range(next_state(LastKey, Count, State1)); + true -> + State1 + end, + {ok, Rows, State2}. + +new_state(Tx, StartKey, EndKey, Options) -> + Reverse = + case erlfdb_util:get(Options, reverse, false) of + true -> 1; + false -> 0; + I when is_integer(I) -> I + end, + Mapper = erlfdb_util:get(Options, mapper), + ok = assert_mapper(Mapper), + #state{ + tx = Tx, + start_key = erlfdb_key:to_selector(StartKey), + end_key = erlfdb_key:to_selector(EndKey), + mapper = Mapper, + limit = erlfdb_util:get(Options, limit, 0), + target_bytes = erlfdb_util:get(Options, target_bytes, 0), + streaming_mode = erlfdb_util:get(Options, streaming_mode, want_all), + iteration = erlfdb_util:get(Options, iteration, 1), + snapshot = erlfdb_util:get(Options, snapshot, false), + reverse = Reverse + }. + +assert_mapper(undefined) -> ok; +assert_mapper(Mapper) when is_binary(Mapper) -> ok; +assert_mapper(Mapper) -> erlang:error({badarg, mapper, Mapper}). + +next_state(LastKey, Count, State = #state{}) -> + #state{ + start_key = StartKey, + end_key = EndKey, + limit = Limit, + reverse = Reverse, + iteration = Iteration + } = State, + {NextStartKey, NextEndKey} = + case Reverse /= 0 of + true -> + {StartKey, erlfdb_key:first_greater_or_equal(LastKey)}; + false -> + {erlfdb_key:first_greater_than(LastKey), EndKey} + end, + State#state{ + start_key = NextStartKey, + end_key = NextEndKey, + limit = max(0, Limit - Count), + iteration = Iteration + 1 + }. + +send_get_range(State = #state{mapper = undefined}) -> + #state{ + tx = Tx, + start_key = StartKey, + end_key = EndKey, + limit = Limit, + target_bytes = TargetBytes, + streaming_mode = StreamingMode, + iteration = Iteration, + snapshot = Snapshot, + reverse = Reverse + } = State, + Future = erlfdb_nif:transaction_get_range( + Tx, + StartKey, + EndKey, + Limit, + TargetBytes, + StreamingMode, + Iteration, + Snapshot, + Reverse + ), + State#state{future = Future}; +send_get_range(State = #state{mapper = Mapper}) -> + #state{ + tx = Tx, + start_key = StartKey, + end_key = EndKey, + limit = Limit, + target_bytes = TargetBytes, + streaming_mode = StreamingMode, + iteration = Iteration, + snapshot = Snapshot, + reverse = Reverse + } = State, + Future = erlfdb_nif:transaction_get_mapped_range( + Tx, + StartKey, + EndKey, + Mapper, + Limit, + TargetBytes, + StreamingMode, + Iteration, + Snapshot, + Reverse + ), + State#state{future = Future}. + +get_last_key(Rows, _State = #state{mapper = undefined}) -> + {K, _V} = lists:last(Rows), + K; +get_last_key(Rows, _State = #state{}) -> + {KV, _, _} = lists:last(Rows), + {K, _} = KV, + K. diff --git a/src/jms.erl b/src/jms.erl new file mode 100644 index 0000000..06dec68 --- /dev/null +++ b/src/jms.erl @@ -0,0 +1,14 @@ +-module(jms). + +-export([start/0]). + +-define(CF, <<"/Users/jstimpson/dev/ex_fdbmonitor/.ex_fdbmonitor/dev.0/etc/fdb.cluster">>). + +start() -> + Db = erlfdb:open(?CF), + N = 11000, + io:format("Setting keys~n"), + [erlfdb:set(Db, erlfdb_tuple:pack({<<"foo">>, X}), <<"bar">>) || X <- lists:seq(1, N)], + io:format("Creating watches~n"), + Watches = [erlfdb:watch(Db, erlfdb_tuple:pack({<<"foo">>, X})) || X <- lists:seq(1, N)], + Watches. diff --git a/test/erlfdb_02_anon_fdbserver_test.erl b/test/erlfdb_02_anon_fdbserver_test.erl index 70bb02d..faa21c9 100644 --- a/test/erlfdb_02_anon_fdbserver_test.erl +++ b/test/erlfdb_02_anon_fdbserver_test.erl @@ -417,6 +417,99 @@ watch_to_test() -> error(timeout) end. +range_iterator_test() -> + Db = erlfdb_sandbox:open(), + Tenant = erlfdb_util:create_and_open_test_tenant(Db, [empty]), + KVs = create_range(Tenant, <<"range_iterator_test">>, 3), + {StartKey, EndKey} = erlfdb_tuple:range({<<"range_iterator_test">>}), + + % Single-page GetRange + ?assertMatch( + KVs, + erlfdb:transactional(Tenant, fun(Tx) -> + {ok, State} = erlfdb_range_iterator:start(Tx, StartKey, EndKey, []), + {ok, Rows, State1} = erlfdb_range_iterator:next(State), + {done, State2} = erlfdb_range_iterator:next(State1), + erlfdb_range_iterator:stop(State2), + [{erlfdb_tuple:unpack(K), erlfdb_tuple:unpack(V)} || {K, V} <- Rows] + end) + ), + + % Reverse GetRange - returned page is reversed, this is consistent with the FDB API + ?assertEqual( + lists:reverse(lists:sublist(KVs, 2, 2)), + erlfdb:transactional(Tenant, fun(Tx) -> + {ok, State} = erlfdb_range_iterator:start(Tx, StartKey, EndKey, [ + {limit, 2}, {reverse, true} + ]), + {ok, Rows, State1} = erlfdb_range_iterator:next(State), + {done, State2} = erlfdb_range_iterator:next(State1), + erlfdb_range_iterator:stop(State2), + [{erlfdb_tuple:unpack(K), erlfdb_tuple:unpack(V)} || {K, V} <- Rows] + end) + ), + + % Multi-page GetRange + ?assertMatch( + KVs, + erlfdb:transactional(Tenant, fun(Tx) -> + {ok, State} = erlfdb_range_iterator:start(Tx, StartKey, EndKey, [{target_bytes, 1}]), + {ok, [Row1], State1} = erlfdb_range_iterator:next(State), + {ok, [Row2], State2} = erlfdb_range_iterator:next(State1), + {ok, [Row3], State3} = erlfdb_range_iterator:next(State2), + {ok, [], State4} = erlfdb_range_iterator:next(State3), + {done, State5} = erlfdb_range_iterator:next(State4), + erlfdb_range_iterator:stop(State5), + [{erlfdb_tuple:unpack(K), erlfdb_tuple:unpack(V)} || {K, V} <- [Row1, Row2, Row3]] + end) + ), + + % Early Cancel + ?assertMatch( + ok, + erlfdb:transactional(Tenant, fun(Tx) -> + {ok, State} = erlfdb_range_iterator:start(Tx, StartKey, EndKey, [{target_bytes, 1}]), + {ok, [_Row1], State1} = erlfdb_range_iterator:next(State), + Future = erlfdb_range_iterator:get_future(State1), + true = is_tuple(Future), + ok = erlfdb_range_iterator:stop(State1) + end) + ), + + % GetMappedRange + Vsn = erlfdb_nif:get_max_api_version(), + + if + Vsn >= 730 -> + Mapper = create_mapping_on_range( + Tenant, <<"range_iterator_test">>, 3, <<"hello world">> + ), + MStartKey = erlfdb_tuple:pack({<<"range_iterator_test">>, 1}), + MEndKey = erlfdb_key:strinc(erlfdb_tuple:pack({<<"range_iterator_test">>, 3})), + + ?assertMatch( + [<<"hello world">>, <<"hello world">>, <<"hello world">>], + erlfdb:transactional(Tenant, fun(Tx) -> + {ok, State} = erlfdb_range_iterator:start(Tx, MStartKey, MEndKey, [ + {mapper, erlfdb_tuple:pack(Mapper)}, {target_bytes, 10} + ]), + {ok, [{{_PKey, _PValue}, {_SKeyBegin, _SKeyEnd}, [{_Key, Message1}]}], State1} = erlfdb_range_iterator:next( + State + ), + {ok, [{{_PKey1, _PValue1}, {_SKeyBegin1, _SKeyEnd1}, [{_Key1, Message2}]}], + State2} = erlfdb_range_iterator:next(State1), + {ok, [{{_PKey2, _PValue2}, {_SKeyBegin2, _SKeyEnd2}, [{_Key2, Message3}]}], + State3} = erlfdb_range_iterator:next(State2), + {ok, [], State4} = erlfdb_range_iterator:next(State3), + {done, State5} = erlfdb_range_iterator:next(State4), + erlfdb_range_iterator:stop(State5), + [Message1, Message2, Message3] + end) + ); + true -> + ok + end. + get_set_get(DbOrTenant) -> Key = gen_key(8), Val = crypto:strong_rand_bytes(8), From dbb94c76dd6dd661ad29f9dd9b2002a2efefbc0e Mon Sep 17 00:00:00 2001 From: Jesse Stimpson Date: Mon, 26 Jan 2026 20:09:23 -0500 Subject: [PATCH 2/2] Generic iterator module --- src/erlfdb_iterator.erl | 136 +++++++++++++++++++++++++ src/erlfdb_range_iterator.erl | 72 +++++++------ src/jms.erl | 14 --- test/erlfdb_02_anon_fdbserver_test.erl | 93 +++++++++++------ 4 files changed, 239 insertions(+), 76 deletions(-) create mode 100644 src/erlfdb_iterator.erl delete mode 100644 src/jms.erl diff --git a/src/erlfdb_iterator.erl b/src/erlfdb_iterator.erl new file mode 100644 index 0000000..d97946c --- /dev/null +++ b/src/erlfdb_iterator.erl @@ -0,0 +1,136 @@ +-module(erlfdb_iterator). + +-define(DOCATTRS, ?OTP_RELEASE >= 27). + +-if(?DOCATTRS). +-moduledoc """ +A generic iterator behaviour, useful for streaming results from the database. +""". +-endif. + +-type state() :: term(). +-type iterator() :: {module(), state()}. +-type result() :: term(). + +-callback handle_call(term(), term(), state()) -> {reply, term(), state()}. +-callback handle_next(state()) -> + {cont, [result()], state()} | {halt, [result()], state()} | {halt, state()}. +-callback handle_stop(state()) -> ok. + +-optional_callbacks([handle_call/3]). + +-export_type([iterator/0, result/0]). + +-export([new/2, next/1, stop/1, run/1, call/2, pipeline/1, module_state/1]). + +-if(?DOCATTRS). +-doc """ +Creates an iterator. +""". +-endif. +-spec new(module(), state()) -> iterator(). +new(Module, State) -> + {Module, State}. + +-if(?DOCATTRS). +-doc """ +Executes a handle_call on the iterator. +""". +-endif. +-spec call(iterator(), term()) -> {term(), iterator()}. +call(Iterator, Call) -> + {Module, State} = module_state(Iterator), + {reply, Reply, State1} = Module:handle_call(Call, make_ref(), State), + {Reply, new(Module, State1)}. + +-if(?DOCATTRS). +-doc """ +Progresses the iterator one step. +""". +-endif. +-spec next(iterator()) -> + {cont, [result()], iterator()} | {halt, [result()], iterator()} | {halt, iterator()}. +next({Module, State}) -> + case Module:handle_next(State) of + {cont, Results, State1} -> + {cont, Results, {Module, State1}}; + {halt, Results, State1} -> + {halt, Results, {Module, State1}}; + {halt, State1} -> + {halt, {Module, State1}} + end. + +-if(?DOCATTRS). +-doc """ +Stops the iterator. +""". +-endif. +-spec stop(iterator()) -> ok. +stop({Module, State}) -> + ok = Module:handle_stop(State). + +-if(?DOCATTRS). +-doc """ +Runs the iterator to completion. + +Returns the list of results and the state of the iterator. + +Caller must call `stop/1` to terminate the iterator. +""". +-endif. +-spec run(iterator()) -> {list(result()), iterator()}. +run(Iterator) -> + [{Result, Iterator1}] = pipeline([Iterator]), + {Result, Iterator1}. + +-if(?DOCATTRS). +-doc """ +Runs all iterators to completion. + +Caller must call `stop/1` to terminate the iterators. +""". +-endif. +-spec pipeline([iterator()]) -> [{list(result()), iterator()}]. +pipeline(List) -> + Len = length(List), + Acc = erlang:make_tuple(Len, []), + IxList = lists:zip(lists:seq(1, Len), List), + pipeline(IxList, Acc). + +-if(?DOCATTRS). +-doc """ +Identifies the module and the specific state of the iterator implementation. +""". +-endif. +-spec module_state(iterator()) -> {module(), state()}. +module_state({Module, State}) -> {Module, State}. + +pipeline([], Acc) -> + tuple_to_list(Acc); +pipeline(IxList, Acc) -> + {Remaining, Acc1} = lists:foldl( + fun({Ix, Iterator}, {Rem0, Acc0}) -> + case next(Iterator) of + {halt, Iterator1} -> + ok = stop(Iterator1), + AccResults = lists:append(lists:reverse(erlang:element(Ix, Acc0))), + {Rem0, erlang:setelement(Ix, Acc0, {AccResults, Iterator1})}; + {halt, [], Iterator1} -> + ok = stop(Iterator1), + AccResults = lists:append(lists:reverse(erlang:element(Ix, Acc0))), + {Rem0, erlang:setelement(Ix, Acc0, {AccResults, Iterator1})}; + {halt, Results, Iterator1} -> + ok = stop(Iterator1), + AccResults = lists:append(lists:reverse([Results | erlang:element(Ix, Acc0)])), + {Rem0, erlang:setelement(Ix, Acc0, {AccResults, Iterator1})}; + {cont, [], Iterator1} -> + {[{Ix, Iterator1} | Rem0], Acc0}; + {cont, Results, Iterator1} -> + AccResults = erlang:element(Ix, Acc0), + {[{Ix, Iterator1} | Rem0], erlang:setelement(Ix, Acc0, [Results | AccResults])} + end + end, + {[], Acc}, + IxList + ), + pipeline(lists:reverse(Remaining), Acc1). diff --git a/src/erlfdb_range_iterator.erl b/src/erlfdb_range_iterator.erl index e17c94c..7f3006f 100644 --- a/src/erlfdb_range_iterator.erl +++ b/src/erlfdb_range_iterator.erl @@ -1,4 +1,5 @@ -module(erlfdb_range_iterator). +-behaviour(erlfdb_iterator). -define(DOCATTRS, ?OTP_RELEASE >= 27). @@ -17,13 +18,14 @@ To use the iterator, 1. `erlfdb_range_iterator:start/4`: Sends the first GetRange request to the database server. -2. `erlfdb_range_iterator.next/1`: Waits for the result of the GetRange request. +2. `erlfdb_iterator.next/1`: Waits for the result of the GetRange request. Then issues another GetRange request to the database server. -3. `erlfdb_range_iterator.stop/1`: Cancels the active future, if one exists. +3. `erlfdb_iterator.stop/1`: Cancels the active future, if one exists. """. -endif. --export([start/4, next/1, stop/1, get_future/1]). +-export([start/3, start/4, get_future/1]). +-export([handle_next/1, handle_stop/1]). -record(state, { tx, @@ -39,50 +41,50 @@ To use the iterator, future }). --type state() :: term(). --export_type([state/0]). +-type state() :: #state{}. +-type page() :: list(erlfdb:kv()) | list(erlfdb:mapped_kv()). + +-export_type([page/0]). -if(?DOCATTRS). -doc """ Starts the iterator. -Sends the first GetRange request to the database server. +Equivalent to `start(Tx, StartKey, EndKey, [])`. """. -endif. --spec start(erlfdb:transaction(), erlfdb:key(), erlfdb:key(), [erlfdb:fold_option()]) -> - {ok, state()}. -start(Tx, StartKey, EndKey, Options) -> - State = new_state(Tx, StartKey, EndKey, Options), - State1 = send_get_range(State), - {ok, State1}. +-spec start(erlfdb:transaction(), erlfdb:key(), erlfdb:key()) -> + erlfdb_iterator:iterator(). +start(Tx, StartKey, EndKey) -> + start(Tx, StartKey, EndKey, []). -if(?DOCATTRS). -doc """ -Progresses the iterator, returning data that is retrieved. +Starts the iterator. -Waits for the active future, sends the next GetRange request, and returns the data. +Sends the first GetRange request to the database server. """. -endif. --spec next(state()) -> - {done, state()} | {ok, list(erlfdb:kv()) | list(erlfdb:mapped_kv()), state()}. -next(State = #state{future = undefined}) -> - {done, State}; -next(State = #state{}) -> +-spec start(erlfdb:transaction(), erlfdb:key(), erlfdb:key(), [erlfdb:fold_option()]) -> + erlfdb_iterator:iterator(). +start(Tx, StartKey, EndKey, Options) -> + State = new_state(Tx, StartKey, EndKey, Options), + State1 = send_get_range(State), + erlfdb_iterator:new(?MODULE, State1). + +-spec handle_next(state()) -> + {halt, state()} | {cont, [page()], state()}. +handle_next(State = #state{future = undefined}) -> + {halt, State}; +handle_next(State = #state{}) -> #state{future = Future} = State, Result = erlfdb:wait(Future, []), handle_result(Result, State). --if(?DOCATTRS). --doc """ -Stops the iterator. - -If there is an active future, it is cancelled. -""". --endif. --spec stop(state()) -> ok. -stop(_State = #state{future = undefined}) -> +-spec handle_stop(state()) -> ok. +handle_stop(_State = #state{future = undefined}) -> ok; -stop(_State = #state{future = Future}) -> +handle_stop(_State = #state{future = Future}) -> erlfdb:cancel(Future), ok. @@ -118,7 +120,17 @@ handle_result({RawRows, Count, HasMore}, State = #state{}) -> true -> State1 end, - {ok, Rows, State2}. + + case {Rows, Recurse} of + {[], false} -> + {halt, State2}; + {[], true} -> + {cont, [], State2}; + {_, false} -> + {halt, [Rows], State2}; + {_, true} -> + {cont, [Rows], State2} + end. new_state(Tx, StartKey, EndKey, Options) -> Reverse = diff --git a/src/jms.erl b/src/jms.erl deleted file mode 100644 index 06dec68..0000000 --- a/src/jms.erl +++ /dev/null @@ -1,14 +0,0 @@ --module(jms). - --export([start/0]). - --define(CF, <<"/Users/jstimpson/dev/ex_fdbmonitor/.ex_fdbmonitor/dev.0/etc/fdb.cluster">>). - -start() -> - Db = erlfdb:open(?CF), - N = 11000, - io:format("Setting keys~n"), - [erlfdb:set(Db, erlfdb_tuple:pack({<<"foo">>, X}), <<"bar">>) || X <- lists:seq(1, N)], - io:format("Creating watches~n"), - Watches = [erlfdb:watch(Db, erlfdb_tuple:pack({<<"foo">>, X})) || X <- lists:seq(1, N)], - Watches. diff --git a/test/erlfdb_02_anon_fdbserver_test.erl b/test/erlfdb_02_anon_fdbserver_test.erl index faa21c9..5e96956 100644 --- a/test/erlfdb_02_anon_fdbserver_test.erl +++ b/test/erlfdb_02_anon_fdbserver_test.erl @@ -423,15 +423,18 @@ range_iterator_test() -> KVs = create_range(Tenant, <<"range_iterator_test">>, 3), {StartKey, EndKey} = erlfdb_tuple:range({<<"range_iterator_test">>}), + UnpackRows = fun(R) -> + [{erlfdb_tuple:unpack(K), erlfdb_tuple:unpack(V)} || {K, V} <- R] + end, + % Single-page GetRange ?assertMatch( KVs, erlfdb:transactional(Tenant, fun(Tx) -> - {ok, State} = erlfdb_range_iterator:start(Tx, StartKey, EndKey, []), - {ok, Rows, State1} = erlfdb_range_iterator:next(State), - {done, State2} = erlfdb_range_iterator:next(State1), - erlfdb_range_iterator:stop(State2), - [{erlfdb_tuple:unpack(K), erlfdb_tuple:unpack(V)} || {K, V} <- Rows] + Iterator = erlfdb_range_iterator:start(Tx, StartKey, EndKey, []), + {halt, [Rows], Iterator1} = erlfdb_iterator:next(Iterator), + erlfdb_iterator:stop(Iterator1), + UnpackRows(Rows) end) ), @@ -439,13 +442,12 @@ range_iterator_test() -> ?assertEqual( lists:reverse(lists:sublist(KVs, 2, 2)), erlfdb:transactional(Tenant, fun(Tx) -> - {ok, State} = erlfdb_range_iterator:start(Tx, StartKey, EndKey, [ + Iterator = erlfdb_range_iterator:start(Tx, StartKey, EndKey, [ {limit, 2}, {reverse, true} ]), - {ok, Rows, State1} = erlfdb_range_iterator:next(State), - {done, State2} = erlfdb_range_iterator:next(State1), - erlfdb_range_iterator:stop(State2), - [{erlfdb_tuple:unpack(K), erlfdb_tuple:unpack(V)} || {K, V} <- Rows] + {halt, [Rows], Iterator1} = erlfdb_iterator:next(Iterator), + erlfdb_iterator:stop(Iterator1), + UnpackRows(Rows) end) ), @@ -453,14 +455,40 @@ range_iterator_test() -> ?assertMatch( KVs, erlfdb:transactional(Tenant, fun(Tx) -> - {ok, State} = erlfdb_range_iterator:start(Tx, StartKey, EndKey, [{target_bytes, 1}]), - {ok, [Row1], State1} = erlfdb_range_iterator:next(State), - {ok, [Row2], State2} = erlfdb_range_iterator:next(State1), - {ok, [Row3], State3} = erlfdb_range_iterator:next(State2), - {ok, [], State4} = erlfdb_range_iterator:next(State3), - {done, State5} = erlfdb_range_iterator:next(State4), - erlfdb_range_iterator:stop(State5), - [{erlfdb_tuple:unpack(K), erlfdb_tuple:unpack(V)} || {K, V} <- [Row1, Row2, Row3]] + Iterator = erlfdb_range_iterator:start(Tx, StartKey, EndKey, [{target_bytes, 1}]), + {cont, [[Row1]], Iterator1} = erlfdb_iterator:next(Iterator), + {cont, [[Row2]], Iterator2} = erlfdb_iterator:next(Iterator1), + {cont, [[Row3]], Iterator3} = erlfdb_iterator:next(Iterator2), + {halt, Iterator4} = erlfdb_iterator:next(Iterator3), + erlfdb_iterator:stop(Iterator4), + UnpackRows([Row1, Row2, Row3]) + end) + ), + + % Run + ?assertMatch( + KVs, + erlfdb:transactional(Tenant, fun(Tx) -> + Iterator = erlfdb_range_iterator:start(Tx, StartKey, EndKey), + {[Rows], Iterator1} = erlfdb_iterator:run(Iterator), + ok = erlfdb_iterator:stop(Iterator1), + UnpackRows(Rows) + end) + ), + + % Pipeline + ?assertMatch( + KVs, + erlfdb:transactional(Tenant, fun(Tx) -> + IteratorA = erlfdb_range_iterator:start(Tx, StartKey, EndKey, [{target_bytes, 1}]), + IteratorB = erlfdb_range_iterator:start(Tx, StartKey, EndKey), + [{[[RowA1], [RowA2], [RowA3]], IteratorA1}, {[RowsB], IteratorB1}] = erlfdb_iterator:pipeline( + [IteratorA, IteratorB] + ), + ok = erlfdb_iterator:stop(IteratorA1), + ok = erlfdb_iterator:stop(IteratorB1), + RowsB = RowsA = [RowA1, RowA2, RowA3], + UnpackRows(RowsA) end) ), @@ -468,11 +496,12 @@ range_iterator_test() -> ?assertMatch( ok, erlfdb:transactional(Tenant, fun(Tx) -> - {ok, State} = erlfdb_range_iterator:start(Tx, StartKey, EndKey, [{target_bytes, 1}]), - {ok, [_Row1], State1} = erlfdb_range_iterator:next(State), - Future = erlfdb_range_iterator:get_future(State1), + Iterator = erlfdb_range_iterator:start(Tx, StartKey, EndKey, [{target_bytes, 1}]), + {cont, [[_Row1]], Iterator1} = erlfdb_iterator:next(Iterator), + {_, State} = erlfdb_iterator:module_state(Iterator1), + Future = erlfdb_range_iterator:get_future(State), true = is_tuple(Future), - ok = erlfdb_range_iterator:stop(State1) + ok = erlfdb_iterator:stop(Iterator1) end) ), @@ -490,19 +519,19 @@ range_iterator_test() -> ?assertMatch( [<<"hello world">>, <<"hello world">>, <<"hello world">>], erlfdb:transactional(Tenant, fun(Tx) -> - {ok, State} = erlfdb_range_iterator:start(Tx, MStartKey, MEndKey, [ + Iterator = erlfdb_range_iterator:start(Tx, MStartKey, MEndKey, [ {mapper, erlfdb_tuple:pack(Mapper)}, {target_bytes, 10} ]), - {ok, [{{_PKey, _PValue}, {_SKeyBegin, _SKeyEnd}, [{_Key, Message1}]}], State1} = erlfdb_range_iterator:next( - State + {cont, [[{{_PKey, _PValue}, {_SKeyBegin, _SKeyEnd}, [{_Key, Message1}]}]], + Iterator1} = erlfdb_iterator:next( + Iterator ), - {ok, [{{_PKey1, _PValue1}, {_SKeyBegin1, _SKeyEnd1}, [{_Key1, Message2}]}], - State2} = erlfdb_range_iterator:next(State1), - {ok, [{{_PKey2, _PValue2}, {_SKeyBegin2, _SKeyEnd2}, [{_Key2, Message3}]}], - State3} = erlfdb_range_iterator:next(State2), - {ok, [], State4} = erlfdb_range_iterator:next(State3), - {done, State5} = erlfdb_range_iterator:next(State4), - erlfdb_range_iterator:stop(State5), + {cont, [[{{_PKey1, _PValue1}, {_SKeyBegin1, _SKeyEnd1}, [{_Key1, Message2}]}]], + Iterator2} = erlfdb_iterator:next(Iterator1), + {cont, [[{{_PKey2, _PValue2}, {_SKeyBegin2, _SKeyEnd2}, [{_Key2, Message3}]}]], + Iterator3} = erlfdb_iterator:next(Iterator2), + {halt, Iterator4} = erlfdb_iterator:next(Iterator3), + erlfdb_iterator:stop(Iterator4), [Message1, Message2, Message3] end) );