Skip to content

Commit 07fd494

Browse files
committed
Simplify import flushing via subinterpreter replacement
Replace del_import with generation-based pool slot recycling. When flush_imports is called, all pool slots are marked stale. When a stale slot's usage count drops to 0, the subinterpreter is destroyed and reinitialized with a fresh one. Changes: - Add usage_count, marked_stale, generation to subinterp_slot_t - Add subinterp_pool_acquire/release/flush_generation functions - Remove del_import/1,2 from py.erl (no longer needed) - Update flush_imports to use generation-based approach - Remove del_import tests from test suite
1 parent bc357f5 commit 07fd494

7 files changed

Lines changed: 310 additions & 147 deletions

File tree

c_src/py_nif.c

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,9 @@ static void context_destructor(ErlNifEnv *env, void *obj) {
441441
ctx->globals = NULL;
442442
ctx->locals = NULL;
443443

444+
/* Release usage count - if slot is stale and count drops to 0,
445+
* the subinterpreter will be destroyed automatically */
446+
subinterp_pool_release(ctx->pool_slot);
444447
subinterp_pool_free(ctx->pool_slot);
445448
ctx->pool_slot = -1;
446449
ctx->destroyed = true;
@@ -4157,9 +4160,13 @@ static ERL_NIF_TERM nif_context_create(ErlNifEnv *env, int argc, const ERL_NIF_T
41574160

41584161
ctx->pool_slot = slot;
41594162

4163+
/* Acquire usage count for the pool slot */
4164+
subinterp_pool_acquire(slot);
4165+
41604166
/* Get the pool slot for interpreter access */
41614167
subinterp_slot_t *pool_slot = subinterp_pool_get(slot);
41624168
if (pool_slot == NULL || !pool_slot->initialized) {
4169+
subinterp_pool_release(slot); /* Release usage count */
41634170
subinterp_pool_free(slot);
41644171
close(ctx->callback_pipe[0]);
41654172
close(ctx->callback_pipe[1]);
@@ -4183,6 +4190,7 @@ static ERL_NIF_TERM nif_context_create(ErlNifEnv *env, int argc, const ERL_NIF_T
41834190
Py_XDECREF(ctx->module_cache);
41844191
PyThreadState_Swap(saved);
41854192
PyGILState_Release(gstate);
4193+
subinterp_pool_release(slot); /* Release usage count */
41864194
subinterp_pool_free(slot);
41874195
close(ctx->callback_pipe[0]);
41884196
close(ctx->callback_pipe[1]);
@@ -4313,7 +4321,11 @@ static ERL_NIF_TERM nif_context_destroy(ErlNifEnv *env, int argc, const ERL_NIF_
43134321
ctx->locals = NULL;
43144322
ctx->module_cache = NULL;
43154323

4316-
/* Release the pool slot back to the pool */
4324+
/* Release the pool slot back to the pool.
4325+
* subinterp_pool_release decrements usage count and may destroy
4326+
* the subinterpreter if it's marked stale and count drops to 0.
4327+
* subinterp_pool_free marks the slot as available for new contexts. */
4328+
subinterp_pool_release(ctx->pool_slot);
43174329
subinterp_pool_free(ctx->pool_slot);
43184330
ctx->pool_slot = -1;
43194331

@@ -5074,6 +5086,31 @@ static ERL_NIF_TERM nif_interp_flush_imports(ErlNifEnv *env, int argc, const ERL
50745086
return ATOM_OK;
50755087
}
50765088

5089+
/**
5090+
* @brief Flush import generation and mark all pool slots stale
5091+
*
5092+
* nif_subinterp_pool_flush_generation() -> {ok, NewGeneration}
5093+
*
5094+
* Increments the global import generation counter and marks all initialized
5095+
* pool slots as stale. When a stale slot's usage count drops to 0, the
5096+
* subinterpreter is automatically destroyed and the slot becomes available
5097+
* for a fresh subinterpreter.
5098+
*
5099+
* This enables flush_imports to work by replacing subinterpreters rather
5100+
* than trying to manipulate sys.modules directly (which can crash C extensions).
5101+
*/
5102+
static ERL_NIF_TERM nif_subinterp_pool_flush_generation(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
5103+
(void)argc;
5104+
(void)argv;
5105+
5106+
#ifdef HAVE_SUBINTERPRETERS
5107+
uint64_t new_gen = subinterp_pool_flush_generation();
5108+
return enif_make_tuple2(env, ATOM_OK, enif_make_uint64(env, new_gen));
5109+
#else
5110+
return enif_make_tuple2(env, ATOM_OK, enif_make_uint64(env, 0));
5111+
#endif
5112+
}
5113+
50775114
/**
50785115
* @brief Execute Python statements using a process-local environment
50795116
*
@@ -7205,6 +7242,7 @@ static ErlNifFunc nif_funcs[] = {
72057242
{"create_local_env", 1, nif_create_local_env, 0},
72067243
{"interp_apply_imports", 2, nif_interp_apply_imports, ERL_NIF_DIRTY_JOB_CPU_BOUND},
72077244
{"interp_flush_imports", 2, nif_interp_flush_imports, ERL_NIF_DIRTY_JOB_CPU_BOUND},
7245+
{"subinterp_pool_flush_generation", 0, nif_subinterp_pool_flush_generation, 0},
72087246
{"context_call_method", 4, nif_context_call_method, ERL_NIF_DIRTY_JOB_CPU_BOUND},
72097247
{"context_to_term", 1, nif_context_to_term, 0},
72107248
{"context_interp_id", 1, nif_context_interp_id, 0},

c_src/py_subinterp_pool.c

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ static int g_pool_size = 0;
4747
/** @brief Pool initialized flag */
4848
static _Atomic bool g_pool_initialized = false;
4949

50+
/** @brief Global import generation counter */
51+
static _Atomic uint64_t g_import_generation = 0;
52+
5053
/* ============================================================================
5154
* Forward declarations for functions defined in other modules
5255
* ============================================================================ */
@@ -196,6 +199,9 @@ int subinterp_pool_init(int size) {
196199
}
197200

198201
slot->initialized = true;
202+
atomic_store(&slot->usage_count, 0);
203+
atomic_store(&slot->marked_stale, false);
204+
slot->generation = atomic_load(&g_import_generation);
199205

200206
/* Swap back to main thread state before creating next subinterpreter */
201207
PyThreadState_Swap(main_tstate);
@@ -345,4 +351,193 @@ bool subinterp_pool_is_initialized(void) {
345351
return atomic_load(&g_pool_initialized);
346352
}
347353

354+
void subinterp_pool_acquire(int slot) {
355+
if (slot < 0 || slot >= g_pool_size) {
356+
return;
357+
}
358+
359+
subinterp_slot_t *s = &g_subinterp_pool[slot];
360+
if (!s->initialized) {
361+
return;
362+
}
363+
364+
atomic_fetch_add(&s->usage_count, 1);
365+
}
366+
367+
/**
368+
* @brief Reinitialize a pool slot with a fresh subinterpreter
369+
*
370+
* Called after destroying a stale subinterpreter to create a fresh one.
371+
* Assumes GIL is already held.
372+
*
373+
* @param slot_idx Slot index
374+
* @return 0 on success, -1 on failure
375+
*/
376+
static int subinterp_pool_reinit_slot(int slot_idx) {
377+
subinterp_slot_t *slot = &g_subinterp_pool[slot_idx];
378+
379+
/* Save main thread state */
380+
PyThreadState *main_tstate = PyThreadState_Get();
381+
382+
/* Configure subinterpreter with SHARED GIL */
383+
PyInterpreterConfig config = {
384+
.use_main_obmalloc = 0,
385+
.allow_fork = 0,
386+
.allow_exec = 0,
387+
.allow_threads = 1,
388+
.allow_daemon_threads = 0,
389+
.check_multi_interp_extensions = 1,
390+
.gil = PyInterpreterConfig_SHARED_GIL,
391+
};
392+
393+
PyThreadState *tstate = NULL;
394+
PyStatus status = Py_NewInterpreterFromConfig(&tstate, &config);
395+
396+
if (PyStatus_Exception(status) || tstate == NULL) {
397+
fprintf(stderr, "subinterp_pool_reinit_slot: failed to create subinterp for slot %d\n", slot_idx);
398+
PyThreadState_Swap(main_tstate);
399+
return -1;
400+
}
401+
402+
/* tstate is now the current thread state for the new interpreter */
403+
slot->interp = PyThreadState_GetInterpreter(tstate);
404+
slot->tstate = tstate;
405+
406+
/* Initialize globals/locals */
407+
slot->globals = PyDict_New();
408+
slot->locals = PyDict_New();
409+
slot->module_cache = PyDict_New();
410+
411+
if (slot->globals == NULL || slot->locals == NULL || slot->module_cache == NULL) {
412+
Py_XDECREF(slot->module_cache);
413+
Py_XDECREF(slot->globals);
414+
Py_XDECREF(slot->locals);
415+
Py_EndInterpreter(tstate);
416+
PyThreadState_Swap(main_tstate);
417+
return -1;
418+
}
419+
420+
/* Import __builtins__ into globals */
421+
PyObject *builtins = PyEval_GetBuiltins();
422+
PyDict_SetItemString(slot->globals, "__builtins__", builtins);
423+
424+
/* Create erlang module in this subinterpreter */
425+
if (create_erlang_module() >= 0) {
426+
/* Register ReactorBuffer and PyBuffer */
427+
ReactorBuffer_register_with_reactor();
428+
PyBuffer_register_with_module();
429+
430+
/* Import erlang module into globals */
431+
PyObject *erlang_module = PyImport_ImportModule("erlang");
432+
if (erlang_module != NULL) {
433+
PyDict_SetItemString(slot->globals, "erlang", erlang_module);
434+
Py_DECREF(erlang_module);
435+
}
436+
}
437+
438+
/* Initialize event loop for this subinterpreter */
439+
init_subinterpreter_event_loop(NULL);
440+
441+
slot->initialized = true;
442+
atomic_store(&slot->usage_count, 0);
443+
atomic_store(&slot->marked_stale, false);
444+
slot->generation = atomic_load(&g_import_generation);
445+
446+
/* Swap back to main thread state */
447+
PyThreadState_Swap(main_tstate);
448+
449+
#ifdef DEBUG
450+
fprintf(stderr, "subinterp_pool_reinit_slot: reinitialized slot %d\n", slot_idx);
451+
#endif
452+
453+
return 0;
454+
}
455+
456+
void subinterp_pool_release(int slot) {
457+
if (slot < 0 || slot >= g_pool_size) {
458+
return;
459+
}
460+
461+
subinterp_slot_t *s = &g_subinterp_pool[slot];
462+
if (!s->initialized) {
463+
return;
464+
}
465+
466+
int prev_count = atomic_fetch_sub(&s->usage_count, 1);
467+
468+
/* If usage drops to 0 and slot is marked stale, destroy and reinitialize
469+
* the subinterpreter. We need the GIL to safely do Python operations. */
470+
if (prev_count == 1 && atomic_load(&s->marked_stale)) {
471+
/* Acquire the GIL */
472+
PyGILState_STATE gstate = PyGILState_Ensure();
473+
474+
/* Save current thread state and swap to the subinterpreter */
475+
PyThreadState *saved = PyThreadState_Swap(s->tstate);
476+
477+
/* Clean up Python objects */
478+
Py_XDECREF(s->module_cache);
479+
Py_XDECREF(s->globals);
480+
Py_XDECREF(s->locals);
481+
482+
/* End the interpreter - this frees s->tstate */
483+
Py_EndInterpreter(s->tstate);
484+
485+
/* Clear slot state */
486+
s->interp = NULL;
487+
s->tstate = NULL;
488+
s->globals = NULL;
489+
s->locals = NULL;
490+
s->module_cache = NULL;
491+
s->initialized = false;
492+
atomic_store(&s->marked_stale, false);
493+
494+
/* Swap back to saved thread state (may be NULL if we didn't have one) */
495+
PyThreadState_Swap(saved);
496+
497+
/* Reinitialize the slot with a fresh subinterpreter */
498+
if (subinterp_pool_reinit_slot(slot) < 0) {
499+
/* Reinit failed - clear allocation bit so slot is skipped */
500+
uint64_t mask = ~(1ULL << slot);
501+
atomic_fetch_and(&g_pool_allocation, mask);
502+
fprintf(stderr, "subinterp_pool_release: failed to reinit slot %d\n", slot);
503+
}
504+
/* If reinit succeeded, slot stays allocated but now has fresh interp */
505+
506+
/* Release the GIL */
507+
PyGILState_Release(gstate);
508+
509+
#ifdef DEBUG
510+
fprintf(stderr, "subinterp_pool_release: recycled stale slot %d\n", slot);
511+
#endif
512+
}
513+
}
514+
515+
uint64_t subinterp_pool_flush_generation(void) {
516+
if (!atomic_load(&g_pool_initialized)) {
517+
return 0;
518+
}
519+
520+
/* Increment generation */
521+
uint64_t new_gen = atomic_fetch_add(&g_import_generation, 1) + 1;
522+
523+
/* Mark all initialized slots as stale */
524+
for (int i = 0; i < g_pool_size; i++) {
525+
subinterp_slot_t *slot = &g_subinterp_pool[i];
526+
if (slot->initialized) {
527+
atomic_store(&slot->marked_stale, true);
528+
}
529+
}
530+
531+
#ifdef DEBUG
532+
fprintf(stderr, "subinterp_pool_flush_generation: incremented to %llu, marked all slots stale\n",
533+
(unsigned long long)new_gen);
534+
#endif
535+
536+
return new_gen;
537+
}
538+
539+
uint64_t subinterp_pool_get_generation(void) {
540+
return atomic_load(&g_import_generation);
541+
}
542+
348543
#endif /* HAVE_SUBINTERPRETERS */

c_src/py_subinterp_pool.h

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,15 @@ typedef struct {
8484

8585
/** @brief Whether this slot is initialized and ready for use */
8686
bool initialized;
87+
88+
/** @brief Number of contexts currently using this slot */
89+
_Atomic int usage_count;
90+
91+
/** @brief Marked for destruction when usage_count reaches 0 */
92+
_Atomic bool marked_stale;
93+
94+
/** @brief Import generation when slot was created */
95+
uint64_t generation;
8796
} subinterp_slot_t;
8897

8998
/**
@@ -163,6 +172,43 @@ void subinterp_pool_shutdown(void);
163172
*/
164173
bool subinterp_pool_is_initialized(void);
165174

175+
/**
176+
* @brief Increment usage count for a slot
177+
*
178+
* Called when a context starts using a pool slot.
179+
*
180+
* @param slot Slot index
181+
*/
182+
void subinterp_pool_acquire(int slot);
183+
184+
/**
185+
* @brief Decrement usage count for a slot
186+
*
187+
* Called when a context stops using a pool slot.
188+
* If the slot is marked stale and usage_count reaches 0,
189+
* the subinterpreter is destroyed and the slot is cleared.
190+
*
191+
* @param slot Slot index
192+
*/
193+
void subinterp_pool_release(int slot);
194+
195+
/**
196+
* @brief Increment import generation and mark all pool slots stale
197+
*
198+
* Called by flush_imports to trigger subinterpreter replacement.
199+
* Stale slots will be destroyed when their usage_count reaches 0.
200+
*
201+
* @return The new generation number
202+
*/
203+
uint64_t subinterp_pool_flush_generation(void);
204+
205+
/**
206+
* @brief Get the current import generation
207+
*
208+
* @return Current generation number
209+
*/
210+
uint64_t subinterp_pool_get_generation(void);
211+
166212
#endif /* HAVE_SUBINTERPRETERS */
167213

168214
#endif /* PY_SUBINTERP_POOL_H */

0 commit comments

Comments
 (0)