diff --git a/CHANGELOG.md b/CHANGELOG.md index ea2ca22..dd9e5aa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## v0.3.4 (TBD) + +### Enhancements + + * (#65) Add an iterator interface for Range requests. + ## v0.3.3 (2025-12-21) ### Enhancements diff --git a/src/erlfdb.app.src b/src/erlfdb.app.src index 02a9a58..1af6105 100644 --- a/src/erlfdb.app.src +++ b/src/erlfdb.app.src @@ -12,7 +12,7 @@ {application, erlfdb, [ {description, "Erlang client for FoundationDB"}, - {vsn, "0.3.3"}, + {vsn, "0.3.4"}, {registered, []}, {applications, [kernel, stdlib]}, {maintainers, ["Jesse Stimpson"]}, diff --git a/src/erlfdb_iterator.erl b/src/erlfdb_iterator.erl new file mode 100644 index 0000000..d97946c --- /dev/null +++ b/src/erlfdb_iterator.erl @@ -0,0 +1,136 @@ +-module(erlfdb_iterator). + +-define(DOCATTRS, ?OTP_RELEASE >= 27). + +-if(?DOCATTRS). +-moduledoc """ +A generic iterator behaviour, useful for streaming results from the database. +""". +-endif. + +-type state() :: term(). +-type iterator() :: {module(), state()}. +-type result() :: term(). + +-callback handle_call(term(), term(), state()) -> {reply, term(), state()}. +-callback handle_next(state()) -> + {cont, [result()], state()} | {halt, [result()], state()} | {halt, state()}. +-callback handle_stop(state()) -> ok. + +-optional_callbacks([handle_call/3]). + +-export_type([iterator/0, result/0]). + +-export([new/2, next/1, stop/1, run/1, call/2, pipeline/1, module_state/1]). + +-if(?DOCATTRS). +-doc """ +Creates an iterator. +""". +-endif. +-spec new(module(), state()) -> iterator(). +new(Module, State) -> + {Module, State}. + +-if(?DOCATTRS). +-doc """ +Executes a handle_call on the iterator. +""". +-endif. +-spec call(iterator(), term()) -> {term(), iterator()}. +call(Iterator, Call) -> + {Module, State} = module_state(Iterator), + {reply, Reply, State1} = Module:handle_call(Call, make_ref(), State), + {Reply, new(Module, State1)}. + +-if(?DOCATTRS). +-doc """ +Progresses the iterator one step. +""". +-endif. +-spec next(iterator()) -> + {cont, [result()], iterator()} | {halt, [result()], iterator()} | {halt, iterator()}. +next({Module, State}) -> + case Module:handle_next(State) of + {cont, Results, State1} -> + {cont, Results, {Module, State1}}; + {halt, Results, State1} -> + {halt, Results, {Module, State1}}; + {halt, State1} -> + {halt, {Module, State1}} + end. + +-if(?DOCATTRS). +-doc """ +Stops the iterator. +""". +-endif. +-spec stop(iterator()) -> ok. +stop({Module, State}) -> + ok = Module:handle_stop(State). + +-if(?DOCATTRS). +-doc """ +Runs the iterator to completion. + +Returns the list of results and the state of the iterator. + +Caller must call `stop/1` to terminate the iterator. +""". +-endif. +-spec run(iterator()) -> {list(result()), iterator()}. +run(Iterator) -> + [{Result, Iterator1}] = pipeline([Iterator]), + {Result, Iterator1}. + +-if(?DOCATTRS). +-doc """ +Runs all iterators to completion. + +Caller must call `stop/1` to terminate the iterators. +""". +-endif. +-spec pipeline([iterator()]) -> [{list(result()), iterator()}]. +pipeline(List) -> + Len = length(List), + Acc = erlang:make_tuple(Len, []), + IxList = lists:zip(lists:seq(1, Len), List), + pipeline(IxList, Acc). + +-if(?DOCATTRS). +-doc """ +Identifies the module and the specific state of the iterator implementation. +""". +-endif. +-spec module_state(iterator()) -> {module(), state()}. +module_state({Module, State}) -> {Module, State}. + +pipeline([], Acc) -> + tuple_to_list(Acc); +pipeline(IxList, Acc) -> + {Remaining, Acc1} = lists:foldl( + fun({Ix, Iterator}, {Rem0, Acc0}) -> + case next(Iterator) of + {halt, Iterator1} -> + ok = stop(Iterator1), + AccResults = lists:append(lists:reverse(erlang:element(Ix, Acc0))), + {Rem0, erlang:setelement(Ix, Acc0, {AccResults, Iterator1})}; + {halt, [], Iterator1} -> + ok = stop(Iterator1), + AccResults = lists:append(lists:reverse(erlang:element(Ix, Acc0))), + {Rem0, erlang:setelement(Ix, Acc0, {AccResults, Iterator1})}; + {halt, Results, Iterator1} -> + ok = stop(Iterator1), + AccResults = lists:append(lists:reverse([Results | erlang:element(Ix, Acc0)])), + {Rem0, erlang:setelement(Ix, Acc0, {AccResults, Iterator1})}; + {cont, [], Iterator1} -> + {[{Ix, Iterator1} | Rem0], Acc0}; + {cont, Results, Iterator1} -> + AccResults = erlang:element(Ix, Acc0), + {[{Ix, Iterator1} | Rem0], erlang:setelement(Ix, Acc0, [Results | AccResults])} + end + end, + {[], Acc}, + IxList + ), + pipeline(lists:reverse(Remaining), Acc1). diff --git a/src/erlfdb_range_iterator.erl b/src/erlfdb_range_iterator.erl new file mode 100644 index 0000000..7f3006f --- /dev/null +++ b/src/erlfdb_range_iterator.erl @@ -0,0 +1,239 @@ +-module(erlfdb_range_iterator). +-behaviour(erlfdb_iterator). + +-define(DOCATTRS, ?OTP_RELEASE >= 27). + +-if(?DOCATTRS). +-moduledoc """ +An iterator interface over GetRange and GetMappedRange. + +The iterator is an alternative implmentation to `erlfdb:get_range/4` and +`erlfdb:wait_for_all_interleaving/3`. The database mechanics are equivalent, +but the iterator allows you to control when each wait actually happens. + +Remember that FoundationDB transaction execution time is limited to 5 seconds, +so you must take care not to delay your iteration. + +To use the iterator, + +1. `erlfdb_range_iterator:start/4`: Sends the first GetRange request to the + database server. +2. `erlfdb_iterator.next/1`: Waits for the result of the GetRange request. + Then issues another GetRange request to the database server. +3. `erlfdb_iterator.stop/1`: Cancels the active future, if one exists. +""". +-endif. + +-export([start/3, start/4, get_future/1]). +-export([handle_next/1, handle_stop/1]). + +-record(state, { + tx, + start_key, + end_key, + mapper, + limit, + target_bytes, + streaming_mode, + iteration, + snapshot, + reverse, + future +}). + +-type state() :: #state{}. +-type page() :: list(erlfdb:kv()) | list(erlfdb:mapped_kv()). + +-export_type([page/0]). + +-if(?DOCATTRS). +-doc """ +Starts the iterator. + +Equivalent to `start(Tx, StartKey, EndKey, [])`. +""". +-endif. +-spec start(erlfdb:transaction(), erlfdb:key(), erlfdb:key()) -> + erlfdb_iterator:iterator(). +start(Tx, StartKey, EndKey) -> + start(Tx, StartKey, EndKey, []). + +-if(?DOCATTRS). +-doc """ +Starts the iterator. + +Sends the first GetRange request to the database server. +""". +-endif. +-spec start(erlfdb:transaction(), erlfdb:key(), erlfdb:key(), [erlfdb:fold_option()]) -> + erlfdb_iterator:iterator(). +start(Tx, StartKey, EndKey, Options) -> + State = new_state(Tx, StartKey, EndKey, Options), + State1 = send_get_range(State), + erlfdb_iterator:new(?MODULE, State1). + +-spec handle_next(state()) -> + {halt, state()} | {cont, [page()], state()}. +handle_next(State = #state{future = undefined}) -> + {halt, State}; +handle_next(State = #state{}) -> + #state{future = Future} = State, + Result = erlfdb:wait(Future, []), + handle_result(Result, State). + +-spec handle_stop(state()) -> ok. +handle_stop(_State = #state{future = undefined}) -> + ok; +handle_stop(_State = #state{future = Future}) -> + erlfdb:cancel(Future), + ok. + +-if(?DOCATTRS). +-doc """ +Gets the active future from the iterator's internal state. + +You needn't call this function for normal use of the iterator. +""". +-endif. +-spec get_future(state()) -> undefined | erlfdb:future(). +get_future(#state{future = Future}) -> Future. + +handle_result({RawRows, Count, HasMore}, State = #state{}) -> + #state{limit = Limit} = State, + % If our limit is within the current set of + % rows we need to truncate the list + Rows = + if + Limit == 0 orelse Limit > Count -> RawRows; + true -> lists:sublist(RawRows, Limit) + end, + + % Determine if we have more rows to iterate + Recurse = (Rows /= []) and (Limit == 0 orelse Limit > Count) and HasMore, + + State1 = State#state{future = undefined}, + State2 = + if + Recurse -> + LastKey = get_last_key(Rows, State1), + send_get_range(next_state(LastKey, Count, State1)); + true -> + State1 + end, + + case {Rows, Recurse} of + {[], false} -> + {halt, State2}; + {[], true} -> + {cont, [], State2}; + {_, false} -> + {halt, [Rows], State2}; + {_, true} -> + {cont, [Rows], State2} + end. + +new_state(Tx, StartKey, EndKey, Options) -> + Reverse = + case erlfdb_util:get(Options, reverse, false) of + true -> 1; + false -> 0; + I when is_integer(I) -> I + end, + Mapper = erlfdb_util:get(Options, mapper), + ok = assert_mapper(Mapper), + #state{ + tx = Tx, + start_key = erlfdb_key:to_selector(StartKey), + end_key = erlfdb_key:to_selector(EndKey), + mapper = Mapper, + limit = erlfdb_util:get(Options, limit, 0), + target_bytes = erlfdb_util:get(Options, target_bytes, 0), + streaming_mode = erlfdb_util:get(Options, streaming_mode, want_all), + iteration = erlfdb_util:get(Options, iteration, 1), + snapshot = erlfdb_util:get(Options, snapshot, false), + reverse = Reverse + }. + +assert_mapper(undefined) -> ok; +assert_mapper(Mapper) when is_binary(Mapper) -> ok; +assert_mapper(Mapper) -> erlang:error({badarg, mapper, Mapper}). + +next_state(LastKey, Count, State = #state{}) -> + #state{ + start_key = StartKey, + end_key = EndKey, + limit = Limit, + reverse = Reverse, + iteration = Iteration + } = State, + {NextStartKey, NextEndKey} = + case Reverse /= 0 of + true -> + {StartKey, erlfdb_key:first_greater_or_equal(LastKey)}; + false -> + {erlfdb_key:first_greater_than(LastKey), EndKey} + end, + State#state{ + start_key = NextStartKey, + end_key = NextEndKey, + limit = max(0, Limit - Count), + iteration = Iteration + 1 + }. + +send_get_range(State = #state{mapper = undefined}) -> + #state{ + tx = Tx, + start_key = StartKey, + end_key = EndKey, + limit = Limit, + target_bytes = TargetBytes, + streaming_mode = StreamingMode, + iteration = Iteration, + snapshot = Snapshot, + reverse = Reverse + } = State, + Future = erlfdb_nif:transaction_get_range( + Tx, + StartKey, + EndKey, + Limit, + TargetBytes, + StreamingMode, + Iteration, + Snapshot, + Reverse + ), + State#state{future = Future}; +send_get_range(State = #state{mapper = Mapper}) -> + #state{ + tx = Tx, + start_key = StartKey, + end_key = EndKey, + limit = Limit, + target_bytes = TargetBytes, + streaming_mode = StreamingMode, + iteration = Iteration, + snapshot = Snapshot, + reverse = Reverse + } = State, + Future = erlfdb_nif:transaction_get_mapped_range( + Tx, + StartKey, + EndKey, + Mapper, + Limit, + TargetBytes, + StreamingMode, + Iteration, + Snapshot, + Reverse + ), + State#state{future = Future}. + +get_last_key(Rows, _State = #state{mapper = undefined}) -> + {K, _V} = lists:last(Rows), + K; +get_last_key(Rows, _State = #state{}) -> + {KV, _, _} = lists:last(Rows), + {K, _} = KV, + K. diff --git a/test/erlfdb_02_anon_fdbserver_test.erl b/test/erlfdb_02_anon_fdbserver_test.erl index 70bb02d..5e96956 100644 --- a/test/erlfdb_02_anon_fdbserver_test.erl +++ b/test/erlfdb_02_anon_fdbserver_test.erl @@ -417,6 +417,128 @@ watch_to_test() -> error(timeout) end. +range_iterator_test() -> + Db = erlfdb_sandbox:open(), + Tenant = erlfdb_util:create_and_open_test_tenant(Db, [empty]), + KVs = create_range(Tenant, <<"range_iterator_test">>, 3), + {StartKey, EndKey} = erlfdb_tuple:range({<<"range_iterator_test">>}), + + UnpackRows = fun(R) -> + [{erlfdb_tuple:unpack(K), erlfdb_tuple:unpack(V)} || {K, V} <- R] + end, + + % Single-page GetRange + ?assertMatch( + KVs, + erlfdb:transactional(Tenant, fun(Tx) -> + Iterator = erlfdb_range_iterator:start(Tx, StartKey, EndKey, []), + {halt, [Rows], Iterator1} = erlfdb_iterator:next(Iterator), + erlfdb_iterator:stop(Iterator1), + UnpackRows(Rows) + end) + ), + + % Reverse GetRange - returned page is reversed, this is consistent with the FDB API + ?assertEqual( + lists:reverse(lists:sublist(KVs, 2, 2)), + erlfdb:transactional(Tenant, fun(Tx) -> + Iterator = erlfdb_range_iterator:start(Tx, StartKey, EndKey, [ + {limit, 2}, {reverse, true} + ]), + {halt, [Rows], Iterator1} = erlfdb_iterator:next(Iterator), + erlfdb_iterator:stop(Iterator1), + UnpackRows(Rows) + end) + ), + + % Multi-page GetRange + ?assertMatch( + KVs, + erlfdb:transactional(Tenant, fun(Tx) -> + Iterator = erlfdb_range_iterator:start(Tx, StartKey, EndKey, [{target_bytes, 1}]), + {cont, [[Row1]], Iterator1} = erlfdb_iterator:next(Iterator), + {cont, [[Row2]], Iterator2} = erlfdb_iterator:next(Iterator1), + {cont, [[Row3]], Iterator3} = erlfdb_iterator:next(Iterator2), + {halt, Iterator4} = erlfdb_iterator:next(Iterator3), + erlfdb_iterator:stop(Iterator4), + UnpackRows([Row1, Row2, Row3]) + end) + ), + + % Run + ?assertMatch( + KVs, + erlfdb:transactional(Tenant, fun(Tx) -> + Iterator = erlfdb_range_iterator:start(Tx, StartKey, EndKey), + {[Rows], Iterator1} = erlfdb_iterator:run(Iterator), + ok = erlfdb_iterator:stop(Iterator1), + UnpackRows(Rows) + end) + ), + + % Pipeline + ?assertMatch( + KVs, + erlfdb:transactional(Tenant, fun(Tx) -> + IteratorA = erlfdb_range_iterator:start(Tx, StartKey, EndKey, [{target_bytes, 1}]), + IteratorB = erlfdb_range_iterator:start(Tx, StartKey, EndKey), + [{[[RowA1], [RowA2], [RowA3]], IteratorA1}, {[RowsB], IteratorB1}] = erlfdb_iterator:pipeline( + [IteratorA, IteratorB] + ), + ok = erlfdb_iterator:stop(IteratorA1), + ok = erlfdb_iterator:stop(IteratorB1), + RowsB = RowsA = [RowA1, RowA2, RowA3], + UnpackRows(RowsA) + end) + ), + + % Early Cancel + ?assertMatch( + ok, + erlfdb:transactional(Tenant, fun(Tx) -> + Iterator = erlfdb_range_iterator:start(Tx, StartKey, EndKey, [{target_bytes, 1}]), + {cont, [[_Row1]], Iterator1} = erlfdb_iterator:next(Iterator), + {_, State} = erlfdb_iterator:module_state(Iterator1), + Future = erlfdb_range_iterator:get_future(State), + true = is_tuple(Future), + ok = erlfdb_iterator:stop(Iterator1) + end) + ), + + % GetMappedRange + Vsn = erlfdb_nif:get_max_api_version(), + + if + Vsn >= 730 -> + Mapper = create_mapping_on_range( + Tenant, <<"range_iterator_test">>, 3, <<"hello world">> + ), + MStartKey = erlfdb_tuple:pack({<<"range_iterator_test">>, 1}), + MEndKey = erlfdb_key:strinc(erlfdb_tuple:pack({<<"range_iterator_test">>, 3})), + + ?assertMatch( + [<<"hello world">>, <<"hello world">>, <<"hello world">>], + erlfdb:transactional(Tenant, fun(Tx) -> + Iterator = erlfdb_range_iterator:start(Tx, MStartKey, MEndKey, [ + {mapper, erlfdb_tuple:pack(Mapper)}, {target_bytes, 10} + ]), + {cont, [[{{_PKey, _PValue}, {_SKeyBegin, _SKeyEnd}, [{_Key, Message1}]}]], + Iterator1} = erlfdb_iterator:next( + Iterator + ), + {cont, [[{{_PKey1, _PValue1}, {_SKeyBegin1, _SKeyEnd1}, [{_Key1, Message2}]}]], + Iterator2} = erlfdb_iterator:next(Iterator1), + {cont, [[{{_PKey2, _PValue2}, {_SKeyBegin2, _SKeyEnd2}, [{_Key2, Message3}]}]], + Iterator3} = erlfdb_iterator:next(Iterator2), + {halt, Iterator4} = erlfdb_iterator:next(Iterator3), + erlfdb_iterator:stop(Iterator4), + [Message1, Message2, Message3] + end) + ); + true -> + ok + end. + get_set_get(DbOrTenant) -> Key = gen_key(8), Val = crypto:strong_rand_bytes(8),