Skip to content

Commit bd55baf

Browse files
committed
Remove dead subinterpreter pool staleness code
Remove usage_count, marked_stale, generation tracking and related functions that were designed for the removed flush_imports feature. Since module unloading is not supported, this complexity is no longer needed. Removed: - subinterp_pool_acquire/release functions - subinterp_pool_flush_generation/get_generation functions - usage_count, marked_stale, generation fields from subinterp_slot_t - g_import_generation global Also fix ASAN crash by using textwrap instead of decimal in tests, since _decimal C extension has global state that crashes in subinterpreters.
1 parent 13e59be commit bd55baf

3 files changed

Lines changed: 1 addition & 254 deletions

File tree

c_src/py_nif.c

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -441,9 +441,6 @@ static void context_destructor(ErlNifEnv *env, void *obj) {
441441
ctx->globals = NULL;
442442
ctx->locals = NULL;
443443

444-
/* Release usage count - if slot is stale and count drops to 0,
445-
* the subinterpreter will be destroyed automatically */
446-
subinterp_pool_release(ctx->pool_slot);
447444
subinterp_pool_free(ctx->pool_slot);
448445
ctx->pool_slot = -1;
449446
ctx->destroyed = true;
@@ -4194,13 +4191,9 @@ static ERL_NIF_TERM nif_context_create(ErlNifEnv *env, int argc, const ERL_NIF_T
41944191

41954192
ctx->pool_slot = slot;
41964193

4197-
/* Acquire usage count for the pool slot */
4198-
subinterp_pool_acquire(slot);
4199-
42004194
/* Get the pool slot for interpreter access */
42014195
subinterp_slot_t *pool_slot = subinterp_pool_get(slot);
42024196
if (pool_slot == NULL || !pool_slot->initialized) {
4203-
subinterp_pool_release(slot); /* Release usage count */
42044197
subinterp_pool_free(slot);
42054198
close(ctx->callback_pipe[0]);
42064199
close(ctx->callback_pipe[1]);
@@ -4224,7 +4217,6 @@ static ERL_NIF_TERM nif_context_create(ErlNifEnv *env, int argc, const ERL_NIF_T
42244217
Py_XDECREF(ctx->module_cache);
42254218
PyThreadState_Swap(saved);
42264219
PyGILState_Release(gstate);
4227-
subinterp_pool_release(slot); /* Release usage count */
42284220
subinterp_pool_free(slot);
42294221
close(ctx->callback_pipe[0]);
42304222
close(ctx->callback_pipe[1]);
@@ -4355,11 +4347,7 @@ static ERL_NIF_TERM nif_context_destroy(ErlNifEnv *env, int argc, const ERL_NIF_
43554347
ctx->locals = NULL;
43564348
ctx->module_cache = NULL;
43574349

4358-
/* Release the pool slot back to the pool.
4359-
* subinterp_pool_release decrements usage count and may destroy
4360-
* the subinterpreter if it's marked stale and count drops to 0.
4361-
* subinterp_pool_free marks the slot as available for new contexts. */
4362-
subinterp_pool_release(ctx->pool_slot);
4350+
/* Release the pool slot back to the pool */
43634351
subinterp_pool_free(ctx->pool_slot);
43644352
ctx->pool_slot = -1;
43654353

c_src/py_subinterp_pool.c

Lines changed: 0 additions & 195 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,6 @@ static int g_pool_size = 0;
4747
/** @brief Pool initialized flag */
4848
static _Atomic bool g_pool_initialized = false;
4949

50-
/** @brief Global import generation counter */
51-
static _Atomic uint64_t g_import_generation = 0;
52-
5350
/* ============================================================================
5451
* Forward declarations for functions defined in other modules
5552
* ============================================================================ */
@@ -199,9 +196,6 @@ int subinterp_pool_init(int size) {
199196
}
200197

201198
slot->initialized = true;
202-
atomic_store(&slot->usage_count, 0);
203-
atomic_store(&slot->marked_stale, false);
204-
slot->generation = atomic_load(&g_import_generation);
205199

206200
/* Swap back to main thread state before creating next subinterpreter */
207201
PyThreadState_Swap(main_tstate);
@@ -351,193 +345,4 @@ bool subinterp_pool_is_initialized(void) {
351345
return atomic_load(&g_pool_initialized);
352346
}
353347

354-
void subinterp_pool_acquire(int slot) {
355-
if (slot < 0 || slot >= g_pool_size) {
356-
return;
357-
}
358-
359-
subinterp_slot_t *s = &g_subinterp_pool[slot];
360-
if (!s->initialized) {
361-
return;
362-
}
363-
364-
atomic_fetch_add(&s->usage_count, 1);
365-
}
366-
367-
/**
368-
* @brief Reinitialize a pool slot with a fresh subinterpreter
369-
*
370-
* Called after destroying a stale subinterpreter to create a fresh one.
371-
* Assumes GIL is already held.
372-
*
373-
* @param slot_idx Slot index
374-
* @return 0 on success, -1 on failure
375-
*/
376-
static int subinterp_pool_reinit_slot(int slot_idx) {
377-
subinterp_slot_t *slot = &g_subinterp_pool[slot_idx];
378-
379-
/* Save main thread state */
380-
PyThreadState *main_tstate = PyThreadState_Get();
381-
382-
/* Configure subinterpreter with SHARED GIL */
383-
PyInterpreterConfig config = {
384-
.use_main_obmalloc = 0,
385-
.allow_fork = 0,
386-
.allow_exec = 0,
387-
.allow_threads = 1,
388-
.allow_daemon_threads = 0,
389-
.check_multi_interp_extensions = 1,
390-
.gil = PyInterpreterConfig_SHARED_GIL,
391-
};
392-
393-
PyThreadState *tstate = NULL;
394-
PyStatus status = Py_NewInterpreterFromConfig(&tstate, &config);
395-
396-
if (PyStatus_Exception(status) || tstate == NULL) {
397-
fprintf(stderr, "subinterp_pool_reinit_slot: failed to create subinterp for slot %d\n", slot_idx);
398-
PyThreadState_Swap(main_tstate);
399-
return -1;
400-
}
401-
402-
/* tstate is now the current thread state for the new interpreter */
403-
slot->interp = PyThreadState_GetInterpreter(tstate);
404-
slot->tstate = tstate;
405-
406-
/* Initialize globals/locals */
407-
slot->globals = PyDict_New();
408-
slot->locals = PyDict_New();
409-
slot->module_cache = PyDict_New();
410-
411-
if (slot->globals == NULL || slot->locals == NULL || slot->module_cache == NULL) {
412-
Py_XDECREF(slot->module_cache);
413-
Py_XDECREF(slot->globals);
414-
Py_XDECREF(slot->locals);
415-
Py_EndInterpreter(tstate);
416-
PyThreadState_Swap(main_tstate);
417-
return -1;
418-
}
419-
420-
/* Import __builtins__ into globals */
421-
PyObject *builtins = PyEval_GetBuiltins();
422-
PyDict_SetItemString(slot->globals, "__builtins__", builtins);
423-
424-
/* Create erlang module in this subinterpreter */
425-
if (create_erlang_module() >= 0) {
426-
/* Register ReactorBuffer and PyBuffer */
427-
ReactorBuffer_register_with_reactor();
428-
PyBuffer_register_with_module();
429-
430-
/* Import erlang module into globals */
431-
PyObject *erlang_module = PyImport_ImportModule("erlang");
432-
if (erlang_module != NULL) {
433-
PyDict_SetItemString(slot->globals, "erlang", erlang_module);
434-
Py_DECREF(erlang_module);
435-
}
436-
}
437-
438-
/* Initialize event loop for this subinterpreter */
439-
init_subinterpreter_event_loop(NULL);
440-
441-
slot->initialized = true;
442-
atomic_store(&slot->usage_count, 0);
443-
atomic_store(&slot->marked_stale, false);
444-
slot->generation = atomic_load(&g_import_generation);
445-
446-
/* Swap back to main thread state */
447-
PyThreadState_Swap(main_tstate);
448-
449-
#ifdef DEBUG
450-
fprintf(stderr, "subinterp_pool_reinit_slot: reinitialized slot %d\n", slot_idx);
451-
#endif
452-
453-
return 0;
454-
}
455-
456-
void subinterp_pool_release(int slot) {
457-
if (slot < 0 || slot >= g_pool_size) {
458-
return;
459-
}
460-
461-
subinterp_slot_t *s = &g_subinterp_pool[slot];
462-
if (!s->initialized) {
463-
return;
464-
}
465-
466-
int prev_count = atomic_fetch_sub(&s->usage_count, 1);
467-
468-
/* If usage drops to 0 and slot is marked stale, destroy and reinitialize
469-
* the subinterpreter. We need the GIL to safely do Python operations. */
470-
if (prev_count == 1 && atomic_load(&s->marked_stale)) {
471-
/* Acquire the GIL */
472-
PyGILState_STATE gstate = PyGILState_Ensure();
473-
474-
/* Save current thread state and swap to the subinterpreter */
475-
PyThreadState *saved = PyThreadState_Swap(s->tstate);
476-
477-
/* Clean up Python objects */
478-
Py_XDECREF(s->module_cache);
479-
Py_XDECREF(s->globals);
480-
Py_XDECREF(s->locals);
481-
482-
/* End the interpreter - this frees s->tstate */
483-
Py_EndInterpreter(s->tstate);
484-
485-
/* Clear slot state */
486-
s->interp = NULL;
487-
s->tstate = NULL;
488-
s->globals = NULL;
489-
s->locals = NULL;
490-
s->module_cache = NULL;
491-
s->initialized = false;
492-
atomic_store(&s->marked_stale, false);
493-
494-
/* Swap back to saved thread state (may be NULL if we didn't have one) */
495-
PyThreadState_Swap(saved);
496-
497-
/* Reinitialize the slot with a fresh subinterpreter */
498-
if (subinterp_pool_reinit_slot(slot) < 0) {
499-
/* Reinit failed - clear allocation bit so slot is skipped */
500-
uint64_t mask = ~(1ULL << slot);
501-
atomic_fetch_and(&g_pool_allocation, mask);
502-
fprintf(stderr, "subinterp_pool_release: failed to reinit slot %d\n", slot);
503-
}
504-
/* If reinit succeeded, slot stays allocated but now has fresh interp */
505-
506-
/* Release the GIL */
507-
PyGILState_Release(gstate);
508-
509-
#ifdef DEBUG
510-
fprintf(stderr, "subinterp_pool_release: recycled stale slot %d\n", slot);
511-
#endif
512-
}
513-
}
514-
515-
uint64_t subinterp_pool_flush_generation(void) {
516-
if (!atomic_load(&g_pool_initialized)) {
517-
return 0;
518-
}
519-
520-
/* Increment generation */
521-
uint64_t new_gen = atomic_fetch_add(&g_import_generation, 1) + 1;
522-
523-
/* Mark all initialized slots as stale */
524-
for (int i = 0; i < g_pool_size; i++) {
525-
subinterp_slot_t *slot = &g_subinterp_pool[i];
526-
if (slot->initialized) {
527-
atomic_store(&slot->marked_stale, true);
528-
}
529-
}
530-
531-
#ifdef DEBUG
532-
fprintf(stderr, "subinterp_pool_flush_generation: incremented to %llu, marked all slots stale\n",
533-
(unsigned long long)new_gen);
534-
#endif
535-
536-
return new_gen;
537-
}
538-
539-
uint64_t subinterp_pool_get_generation(void) {
540-
return atomic_load(&g_import_generation);
541-
}
542-
543348
#endif /* HAVE_SUBINTERPRETERS */

c_src/py_subinterp_pool.h

Lines changed: 0 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -84,15 +84,6 @@ typedef struct {
8484

8585
/** @brief Whether this slot is initialized and ready for use */
8686
bool initialized;
87-
88-
/** @brief Number of contexts currently using this slot */
89-
_Atomic int usage_count;
90-
91-
/** @brief Marked for destruction when usage_count reaches 0 */
92-
_Atomic bool marked_stale;
93-
94-
/** @brief Import generation when slot was created */
95-
uint64_t generation;
9687
} subinterp_slot_t;
9788

9889
/**
@@ -172,43 +163,6 @@ void subinterp_pool_shutdown(void);
172163
*/
173164
bool subinterp_pool_is_initialized(void);
174165

175-
/**
176-
* @brief Increment usage count for a slot
177-
*
178-
* Called when a context starts using a pool slot.
179-
*
180-
* @param slot Slot index
181-
*/
182-
void subinterp_pool_acquire(int slot);
183-
184-
/**
185-
* @brief Decrement usage count for a slot
186-
*
187-
* Called when a context stops using a pool slot.
188-
* If the slot is marked stale and usage_count reaches 0,
189-
* the subinterpreter is destroyed and the slot is cleared.
190-
*
191-
* @param slot Slot index
192-
*/
193-
void subinterp_pool_release(int slot);
194-
195-
/**
196-
* @brief Increment import generation and mark all pool slots stale
197-
*
198-
* Called by flush_imports to trigger subinterpreter replacement.
199-
* Stale slots will be destroyed when their usage_count reaches 0.
200-
*
201-
* @return The new generation number
202-
*/
203-
uint64_t subinterp_pool_flush_generation(void);
204-
205-
/**
206-
* @brief Get the current import generation
207-
*
208-
* @return Current generation number
209-
*/
210-
uint64_t subinterp_pool_get_generation(void);
211-
212166
#endif /* HAVE_SUBINTERPRETERS */
213167

214168
#endif /* PY_SUBINTERP_POOL_H */

0 commit comments

Comments
 (0)