Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 55 additions & 9 deletions c_src/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,22 @@ static void erlfdb_future_cb(FDBFuture *fdb_future, void *data) {
ErlNifEnv *caller;
ERL_NIF_TERM msg;

// FoundationDB callbacks can fire from the thread
// that created them. Check if we were actually
// submitted to the network thread or not so that
// we pass the correct environment to enif_send
// FoundationDB callbacks can fire either:
// 1. Synchronously from the calling thread (when the future is already
// ready at the time fdb_future_set_callback is called), or
// 2. Asynchronously from the FDB network thread
//
// We use enif_thread_type() to distinguish these cases:
// - ERL_NIF_THR_UNDEFINED means we're on the FDB network thread (case 2)
// - Any other value means we're on an Erlang scheduler thread (case 1)
//
// For enif_send, when called from a non-scheduler thread we must pass
// NULL as the caller environment. When called synchronously from a NIF,
// we can pass the process-bound environment (pid_env) for efficiency.
//
// IMPORTANT: pid_env is only valid during the NIF call that created
// this future. This is safe here because synchronous callbacks only
// occur during erlfdb_create_future, before that NIF returns.
if (enif_thread_type() == ERL_NIF_THR_UNDEFINED) {
caller = NULL;
} else {
Expand Down Expand Up @@ -100,6 +112,11 @@ static ERL_NIF_TERM erlfdb_create_future(ErlNifEnv *env, ERL_NIF_TERM *tx_ref,
fdb_error_t err;

f = enif_alloc_resource(ErlFDBFutureRes, sizeof(ErlFDBFuture));
if (f == NULL) {
fdb_future_destroy(future);
return enif_make_badarg(env);
}

f->future = future;
f->ftype = ftype;
if (to == NULL) {
Expand All @@ -109,13 +126,23 @@ static ERL_NIF_TERM erlfdb_create_future(ErlNifEnv *env, ERL_NIF_TERM *tx_ref,
}
f->pid_env = env;
f->msg_env = enif_alloc_env();
if (f->msg_env == NULL) {
enif_release_resource(f);
return enif_make_badarg(env);
}

if (tx_ref != NULL) {
f->msg_ref = T2(f->msg_env, enif_make_copy(f->msg_env, *tx_ref),
enif_make_copy(f->msg_env, ref));
} else {
f->msg_ref = enif_make_copy(f->msg_env, ref);
}
f->lock = enif_mutex_create("fdb:future_lock");
if (f->lock == NULL) {
enif_release_resource(f);
return enif_make_badarg(env);
}

f->cancelled = false;

// This resource reference counting dance is a bit
Expand Down Expand Up @@ -591,9 +618,8 @@ static ERL_NIF_TERM erlfdb_network_set_option(ErlNifEnv *env, int argc,
option = FDB_NET_OPTION_DISABLE_CLIENT_STATISTICS_LOGGING;
} else if (IS_ATOM(argv[0], enable_slow_task_profiling)) {
option = FDB_NET_OPTION_ENABLE_SLOW_TASK_PROFILING;
}
#if FDB_API_VERSION >= 630
else if (IS_ATOM(argv[0], enable_run_loop_profiling)) {
} else if (IS_ATOM(argv[0], enable_run_loop_profiling)) {
option = FDB_NET_OPTION_ENABLE_RUN_LOOP_PROFILING;
} else if (IS_ATOM(argv[0], client_threads_per_version)) {
option = FDB_NET_OPTION_CLIENT_THREADS_PER_VERSION;
Expand All @@ -602,9 +628,9 @@ static ERL_NIF_TERM erlfdb_network_set_option(ErlNifEnv *env, int argc,
#if FDB_API_VERSION >= 730
option = FDB_NET_OPTION_IGNORE_EXTERNAL_CLIENT_FAILURES;
#else
// 7.3 added some new checks for loading external client libraries
// and those checks are not present in the older versions
return ATOM_ok;
// 7.3 added some new checks for loading external client libraries
// and those checks are not present in the older versions
return ATOM_ok;
#endif
} else {
return enif_make_badarg(env);
Expand Down Expand Up @@ -843,6 +869,11 @@ static ERL_NIF_TERM erlfdb_create_database(ErlNifEnv *env, int argc,
}

d = enif_alloc_resource(ErlFDBDatabaseRes, sizeof(ErlFDBDatabase));
if (d == NULL) {
fdb_database_destroy(database);
return enif_make_badarg(env);
}

d->database = database;

ret = enif_make_resource(env, d);
Expand Down Expand Up @@ -890,6 +921,11 @@ static ERL_NIF_TERM erlfdb_database_open_tenant(ErlNifEnv *env, int argc,
}

ten = enif_alloc_resource(ErlFDBTenantRes, sizeof(ErlFDBTenant));
if (ten == NULL) {
fdb_tenant_destroy(tenant);
return enif_make_badarg(env);
}

ten->tenant = tenant;

ret = enif_make_resource(env, ten);
Expand Down Expand Up @@ -995,6 +1031,11 @@ erlfdb_database_create_transaction(ErlNifEnv *env, int argc,
}

t = enif_alloc_resource(ErlFDBTransactionRes, sizeof(ErlFDBTransaction));
if (t == NULL) {
fdb_transaction_destroy(transaction);
return enif_make_badarg(env);
}

t->transaction = transaction;

enif_self(env, &pid);
Expand Down Expand Up @@ -1095,6 +1136,11 @@ erlfdb_tenant_create_transaction(ErlNifEnv *env, int argc,
}

t = enif_alloc_resource(ErlFDBTransactionRes, sizeof(ErlFDBTransaction));
if (t == NULL) {
fdb_transaction_destroy(transaction);
return enif_make_badarg(env);
}

t->transaction = transaction;

enif_self(env, &pid);
Expand Down
35 changes: 26 additions & 9 deletions c_src/resources.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,37 @@ typedef struct _ErlFDBFuture {
ErlFDBFutureType ftype;
ErlNifPid pid;

/* pid_env: A 'process bound environment' for sending messages
* in the event of the callback executing synchronously
* (e.g. fdb_future_cancel)
/* pid_env: The process-bound environment from the NIF call that
* created this future. Used as the 'caller_env' argument to
* enif_send() when the FDB callback fires synchronously.
*
* Being process bound, it must not be used across different
* NIF calls.
* LIFETIME: Only valid during the erlfdb_create_future() call.
* After that NIF returns, this pointer is stale and must not be
* dereferenced.
*
* An alternative is to use thread-specific data (tsd) to
* store the environment for each NIF call.
* SAFETY: This is safe because we only use pid_env when
* enif_thread_type() != ERL_NIF_THR_UNDEFINED, which indicates
* we're on an Erlang scheduler thread. Synchronous callbacks
* only occur during fdb_future_set_callback() within
* erlfdb_create_future(), so pid_env is still valid at that point.
* Asynchronous callbacks from the FDB network thread pass NULL
* to enif_send() instead.
*
* FUTURE CONSIDERATION: A more robust alternative would be to use
* enif_tsd_key_create() to store the current NIF environment in
* thread-specific data at the start of each NIF call. The callback
* could then retrieve it via enif_tsd_get(), eliminating the need
* to store a potentially-stale pointer in the future struct.
*
* See erlfdb_future_cb() in main.c for the callback implementation.
*/
ErlNifEnv *pid_env;

/* msg_env: A 'process independent environment' used to send
* terms with enif_send.
/* msg_env: A process-independent environment allocated with
* enif_alloc_env(). Owns the terms (msg_ref) sent via enif_send().
*
* LIFETIME: Allocated in erlfdb_create_future(), freed in
* erlfdb_future_dtor(). Survives across NIF calls and threads.
*/
ErlNifEnv *msg_env;

Expand Down
8 changes: 5 additions & 3 deletions src/erlfdb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1930,10 +1930,12 @@ get_approximate_size(?IS_SS = SS) ->
Gets a local auto-incrementing integer for this transaction.

Each time this function is executed, a local counter on the transaction is incremented,
and the value returned.
and the value returned. Returns values in the range 0-65535 (16-bit). Raises `badarg`
if called more than 65536 times on the same transaction.

This integer is useful in combination with versionstamps to ensure uniqueness
of versionstamped keys and values for your transaction.
of versionstamped keys and values for your transaction. The 16-bit limit matches
the user-batch portion of FoundationDB versionstamps.

## Examples

Expand All @@ -1955,7 +1957,7 @@ Without `get_next_tx_id`, the versionstamped keys generated at commit time would
exactly one versionstamp is created at commit time.
""".
-endif.
-spec get_next_tx_id(transaction() | snapshot()) -> non_neg_integer().
-spec get_next_tx_id(transaction() | snapshot()) -> 0..65535.
get_next_tx_id(?IS_TX = Tx) ->
erlfdb_nif:transaction_get_next_tx_id(Tx);
get_next_tx_id(?IS_SS = SS) ->
Expand Down