Skip to content

Commit bf2a88b

Browse files
committed
Optimize nif_process_ready_tasks for ~15% performance improvement
- Replace asyncio.iscoroutine() with PyCoro_CheckExact C API (~2-3us/task) - Use stack buffers for module/func strings instead of enif_alloc (~0.5-1us/task) - Cache asyncio.events module alongside cached_asyncio (~1-2us/call) - Pool ErlNifEnv allocations with 64-entry pool using enif_clear_env (~1-2us/task) Target: reduce from ~53us to ~45-47us per call in submit_task flow.
1 parent e631349 commit bf2a88b

File tree

2 files changed

+121
-61
lines changed

2 files changed

+121
-61
lines changed

c_src/py_event_loop.c

Lines changed: 107 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,15 @@ void event_loop_destructor(ErlNifEnv *env, void *obj) {
430430
loop->task_queue = NULL;
431431
}
432432

433+
/* Clean up ErlNifEnv pool */
434+
for (int i = 0; i < loop->env_pool_count; i++) {
435+
if (loop->env_pool[i] != NULL) {
436+
enif_free_env(loop->env_pool[i]);
437+
loop->env_pool[i] = NULL;
438+
}
439+
}
440+
loop->env_pool_count = 0;
441+
433442
/* Release Python loop reference if held */
434443
if (loop->py_loop_valid && loop->py_loop != NULL) {
435444
/* Only decref if Python runtime is still running and we can safely acquire GIL */
@@ -442,8 +451,10 @@ void event_loop_destructor(ErlNifEnv *env, void *obj) {
442451
if (loop->py_cache_valid) {
443452
Py_XDECREF(loop->cached_asyncio);
444453
Py_XDECREF(loop->cached_run_and_send);
454+
Py_XDECREF(loop->cached_events_module);
445455
loop->cached_asyncio = NULL;
446456
loop->cached_run_and_send = NULL;
457+
loop->cached_events_module = NULL;
447458
loop->py_cache_valid = false;
448459
}
449460
/* Clear callable cache */
@@ -1133,12 +1144,17 @@ ERL_NIF_TERM nif_event_loop_new(ErlNifEnv *env, int argc,
11331144
/* Initialize Python cache (uvloop-style optimization) */
11341145
loop->cached_asyncio = NULL;
11351146
loop->cached_run_and_send = NULL;
1147+
loop->cached_events_module = NULL;
11361148
loop->py_cache_valid = false;
11371149

11381150
/* Initialize callable cache */
11391151
memset(loop->callable_cache, 0, sizeof(loop->callable_cache));
11401152
loop->callable_cache_count = 0;
11411153

1154+
/* Initialize ErlNifEnv pool (empty initially, populated on demand) */
1155+
memset(loop->env_pool, 0, sizeof(loop->env_pool));
1156+
loop->env_pool_count = 0;
1157+
11421158
/* Initialize per-process namespace registry */
11431159
loop->namespaces_head = NULL;
11441160
loop->pid_env_head = NULL;
@@ -2687,6 +2703,32 @@ typedef struct {
26872703
ERL_NIF_TERM task_term;
26882704
} dequeued_task_t;
26892705

2706+
/**
2707+
* Get a pooled ErlNifEnv or allocate a new one.
2708+
* This amortizes allocation overhead by reusing cleared environments.
2709+
*/
2710+
static inline ErlNifEnv *get_pooled_env(erlang_event_loop_t *loop) {
2711+
if (loop->env_pool_count > 0) {
2712+
loop->env_pool_count--;
2713+
return loop->env_pool[loop->env_pool_count];
2714+
}
2715+
return enif_alloc_env();
2716+
}
2717+
2718+
/**
2719+
* Return an ErlNifEnv to the pool (or free it if pool is full).
2720+
* The env is cleared before being returned to the pool.
2721+
*/
2722+
static inline void return_pooled_env(erlang_event_loop_t *loop, ErlNifEnv *term_env) {
2723+
if (loop->env_pool_count < ENV_POOL_SIZE) {
2724+
enif_clear_env(term_env);
2725+
loop->env_pool[loop->env_pool_count] = term_env;
2726+
loop->env_pool_count++;
2727+
} else {
2728+
enif_free_env(term_env);
2729+
}
2730+
}
2731+
26902732
/**
26912733
* process_ready_tasks(LoopRef) -> ok | {error, Reason}
26922734
*
@@ -2770,15 +2812,15 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc,
27702812
task_bin.size = iov[0].iov_len;
27712813

27722814
/* Deserialize task tuple (NIF operation, no GIL needed) */
2773-
ErlNifEnv *term_env = enif_alloc_env();
2815+
ErlNifEnv *term_env = get_pooled_env(loop);
27742816
if (term_env == NULL) {
27752817
break; /* Will process what we have so far */
27762818
}
27772819

27782820
ERL_NIF_TERM task_term;
27792821
if (enif_binary_to_term(term_env, task_bin.data, task_bin.size,
27802822
&task_term, 0) == 0) {
2781-
enif_free_env(term_env);
2823+
return_pooled_env(loop, term_env);
27822824
/* Dequeue and skip this malformed task */
27832825
enif_ioq_deq(loop->task_queue, iov[0].iov_len, NULL);
27842826
atomic_fetch_sub(&loop->task_count, 1);
@@ -2812,22 +2854,24 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc,
28122854
* Avoids PyImport_ImportModule on every call */
28132855
PyObject *asyncio;
28142856
PyObject *run_and_send;
2857+
PyObject *events_module;
28152858

2816-
/* For thread-local event loop context (dirty NIF scheduler workaround) */
2817-
PyObject *events_module = NULL;
2859+
/* Per-call state for thread-local event loop context */
28182860
PyObject *old_running_loop = NULL;
28192861

2820-
if (loop->py_cache_valid && loop->cached_asyncio != NULL && loop->cached_run_and_send != NULL) {
2862+
if (loop->py_cache_valid && loop->cached_asyncio != NULL &&
2863+
loop->cached_run_and_send != NULL && loop->cached_events_module != NULL) {
28212864
/* Use cached references */
28222865
asyncio = loop->cached_asyncio;
28232866
run_and_send = loop->cached_run_and_send;
2867+
events_module = loop->cached_events_module;
28242868
} else {
28252869
/* First call or cache invalidated - populate cache */
28262870
asyncio = PyImport_ImportModule("asyncio");
28272871
if (asyncio == NULL) {
28282872
/* Cleanup dequeued tasks */
28292873
for (int i = 0; i < num_tasks; i++) {
2830-
enif_free_env(tasks[i].term_env);
2874+
return_pooled_env(loop, tasks[i].term_env);
28312875
}
28322876
PyGILState_Release(gstate);
28332877
return make_error(env, "asyncio_import_failed");
@@ -2844,7 +2888,7 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc,
28442888
if (erlang_loop_mod == NULL) {
28452889
Py_DECREF(asyncio);
28462890
for (int i = 0; i < num_tasks; i++) {
2847-
enif_free_env(tasks[i].term_env);
2891+
return_pooled_env(loop, tasks[i].term_env);
28482892
}
28492893
PyGILState_Release(gstate);
28502894
return make_error(env, "erlang_loop_import_failed");
@@ -2855,15 +2899,29 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc,
28552899
if (run_and_send == NULL) {
28562900
Py_DECREF(asyncio);
28572901
for (int i = 0; i < num_tasks; i++) {
2858-
enif_free_env(tasks[i].term_env);
2902+
return_pooled_env(loop, tasks[i].term_env);
28592903
}
28602904
PyGILState_Release(gstate);
28612905
return make_error(env, "run_and_send_not_found");
28622906
}
28632907

2908+
/* Import asyncio.events for running loop context management */
2909+
events_module = PyImport_ImportModule("asyncio.events");
2910+
if (events_module == NULL) {
2911+
PyErr_Clear();
2912+
Py_DECREF(asyncio);
2913+
Py_DECREF(run_and_send);
2914+
for (int i = 0; i < num_tasks; i++) {
2915+
return_pooled_env(loop, tasks[i].term_env);
2916+
}
2917+
PyGILState_Release(gstate);
2918+
return make_error(env, "events_import_failed");
2919+
}
2920+
28642921
/* Store in cache */
28652922
loop->cached_asyncio = asyncio;
28662923
loop->cached_run_and_send = run_and_send;
2924+
loop->cached_events_module = events_module;
28672925
loop->py_cache_valid = true;
28682926
}
28692927

@@ -2881,7 +2939,7 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc,
28812939
if (erlang_loop_mod == NULL) {
28822940
PyErr_Clear();
28832941
for (int i = 0; i < num_tasks; i++) {
2884-
enif_free_env(tasks[i].term_env);
2942+
return_pooled_env(loop, tasks[i].term_env);
28852943
}
28862944
PyGILState_Release(gstate);
28872945
return make_error(env, "loop_module_import_failed");
@@ -2892,7 +2950,7 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc,
28922950
if (loop_class == NULL) {
28932951
PyErr_Clear();
28942952
for (int i = 0; i < num_tasks; i++) {
2895-
enif_free_env(tasks[i].term_env);
2953+
return_pooled_env(loop, tasks[i].term_env);
28962954
}
28972955
PyGILState_Release(gstate);
28982956
return make_error(env, "loop_class_not_found");
@@ -2903,7 +2961,7 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc,
29032961
if (new_loop == NULL) {
29042962
PyErr_Clear();
29052963
for (int i = 0; i < num_tasks; i++) {
2906-
enif_free_env(tasks[i].term_env);
2964+
return_pooled_env(loop, tasks[i].term_env);
29072965
}
29082966
PyGILState_Release(gstate);
29092967
return make_error(env, "loop_creation_failed");
@@ -2936,22 +2994,20 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc,
29362994
* and the running loop for this thread.
29372995
*
29382996
* This mirrors what Python's asyncio.run() does internally (see _loop.py).
2997+
* Note: events_module is already cached, no need to import again.
29392998
* ======================================================================== */
2940-
events_module = PyImport_ImportModule("asyncio.events");
2941-
if (events_module != NULL) {
2942-
/* Set our loop as current event loop for this thread */
2943-
PyObject *set_result = PyObject_CallMethod(asyncio, "set_event_loop", "O", loop->py_loop);
2944-
Py_XDECREF(set_result);
2999+
/* Set our loop as current event loop for this thread */
3000+
PyObject *set_result = PyObject_CallMethod(asyncio, "set_event_loop", "O", loop->py_loop);
3001+
Py_XDECREF(set_result);
29453002

2946-
/* Save and set running loop (needed for asyncio.Task creation) */
2947-
old_running_loop = PyObject_CallMethod(events_module, "_get_running_loop", NULL);
2948-
if (old_running_loop == NULL) {
2949-
PyErr_Clear();
2950-
old_running_loop = Py_NewRef(Py_None);
2951-
}
2952-
PyObject *set_running = PyObject_CallMethod(events_module, "_set_running_loop", "O", loop->py_loop);
2953-
Py_XDECREF(set_running);
3003+
/* Save and set running loop (needed for asyncio.Task creation) */
3004+
old_running_loop = PyObject_CallMethod(events_module, "_get_running_loop", NULL);
3005+
if (old_running_loop == NULL) {
3006+
PyErr_Clear();
3007+
old_running_loop = Py_NewRef(Py_None);
29543008
}
3009+
PyObject *set_running = PyObject_CallMethod(events_module, "_set_running_loop", "O", loop->py_loop);
3010+
Py_XDECREF(set_running);
29553011

29563012
/* Process all dequeued tasks */
29573013
ERL_NIF_TERM result = ATOM_OK;
@@ -2966,33 +3022,31 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc,
29663022
const ERL_NIF_TERM *tuple_elems;
29673023
if (!enif_get_tuple(term_env, task_term, &arity, &tuple_elems) ||
29683024
arity != 6) {
2969-
enif_free_env(term_env);
3025+
return_pooled_env(loop, term_env);
29703026
continue;
29713027
}
29723028

29733029
ErlNifPid caller_pid;
29743030
if (!enif_get_local_pid(term_env, tuple_elems[0], &caller_pid)) {
2975-
enif_free_env(term_env);
3031+
return_pooled_env(loop, term_env);
29763032
continue;
29773033
}
29783034

29793035
ErlNifBinary module_bin, func_bin;
29803036
if (!enif_inspect_binary(term_env, tuple_elems[2], &module_bin) ||
29813037
!enif_inspect_binary(term_env, tuple_elems[3], &func_bin)) {
2982-
enif_free_env(term_env);
3038+
return_pooled_env(loop, term_env);
29833039
continue;
29843040
}
29853041

29863042
/* Look up env by PID (registered via submit_task_with_env) */
29873043
py_env_resource_t *task_env = (py_env_resource_t *)lookup_pid_env(loop, &caller_pid);
29883044

2989-
/* Convert module/func to C strings */
2990-
char *module_name = enif_alloc(module_bin.size + 1);
2991-
char *func_name = enif_alloc(func_bin.size + 1);
2992-
if (module_name == NULL || func_name == NULL) {
2993-
enif_free(module_name);
2994-
enif_free(func_name);
2995-
enif_free_env(term_env);
3045+
/* Convert module/func to C strings (stack buffers to avoid alloc overhead) */
3046+
char module_name[CALLABLE_NAME_MAX];
3047+
char func_name[CALLABLE_NAME_MAX];
3048+
if (module_bin.size >= CALLABLE_NAME_MAX || func_bin.size >= CALLABLE_NAME_MAX) {
3049+
return_pooled_env(loop, term_env);
29963050
continue;
29973051
}
29983052
memcpy(module_name, module_bin.data, module_bin.size);
@@ -3022,19 +3076,16 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc,
30223076
func = get_function_for_task(loop, ns, module_name, func_name);
30233077
}
30243078

3025-
enif_free(module_name);
3026-
enif_free(func_name);
3027-
30283079
if (func == NULL) {
3029-
enif_free_env(term_env);
3080+
return_pooled_env(loop, term_env);
30303081
continue;
30313082
}
30323083

30333084
/* Convert args list to Python tuple */
30343085
unsigned int args_len;
30353086
if (!enif_get_list_length(term_env, tuple_elems[4], &args_len)) {
30363087
Py_DECREF(func);
3037-
enif_free_env(term_env);
3088+
return_pooled_env(loop, term_env);
30383089
continue;
30393090
}
30403091

@@ -3055,7 +3106,7 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc,
30553106
if (!args_ok) {
30563107
Py_DECREF(args);
30573108
Py_DECREF(func);
3058-
enif_free_env(term_env);
3109+
return_pooled_env(loop, term_env);
30593110
continue;
30603111
}
30613112

@@ -3081,21 +3132,19 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc,
30813132

30823133
if (coro == NULL) {
30833134
PyErr_Clear();
3084-
enif_free_env(term_env);
3135+
return_pooled_env(loop, term_env);
30853136
continue;
30863137
}
30873138

3088-
/* Check if result is a coroutine */
3089-
PyObject *iscoroutine = PyObject_CallMethod(asyncio, "iscoroutine", "O", coro);
3090-
bool is_coro = iscoroutine != NULL && PyObject_IsTrue(iscoroutine);
3091-
Py_XDECREF(iscoroutine);
3139+
/* Check if result is a coroutine (direct C API, avoids method call overhead) */
3140+
bool is_coro = PyCoro_CheckExact(coro);
30923141

30933142
/* Create caller PID object */
30943143
extern PyTypeObject ErlangPidType;
30953144
ErlangPidObject *pid_obj = PyObject_New(ErlangPidObject, &ErlangPidType);
30963145
if (pid_obj == NULL) {
30973146
Py_DECREF(coro);
3098-
enif_free_env(term_env);
3147+
return_pooled_env(loop, term_env);
30993148
continue;
31003149
}
31013150
/* Copy PID */
@@ -3107,7 +3156,7 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc,
31073156
PyErr_Clear();
31083157
Py_DECREF((PyObject *)pid_obj);
31093158
Py_DECREF(coro);
3110-
enif_free_env(term_env);
3159+
return_pooled_env(loop, term_env);
31113160
continue;
31123161
}
31133162

@@ -3155,7 +3204,7 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc,
31553204

31563205
Py_DECREF(py_ref);
31573206
Py_DECREF((PyObject *)pid_obj);
3158-
enif_free_env(term_env);
3207+
return_pooled_env(loop, term_env);
31593208
}
31603209

31613210
/* NOTE: We don't DECREF asyncio and run_and_send here because they're cached
@@ -3173,11 +3222,9 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc,
31733222
/* Loop is already running - just signal it and clean up.
31743223
* The pending events were already added by dispatch_timer/handle_fd_event,
31753224
* and the condition variable was signaled. The running loop will wake up
3176-
* and process them. */
3177-
if (events_module != NULL) {
3178-
Py_XDECREF(old_running_loop);
3179-
Py_DECREF(events_module);
3180-
}
3225+
* and process them.
3226+
* Note: events_module is cached, so we don't DECREF it. */
3227+
Py_XDECREF(old_running_loop);
31813228
PyGILState_Release(gstate);
31823229
return ATOM_OK;
31833230
}
@@ -3229,14 +3276,12 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc,
32293276
coros_scheduled = 0; /* Already processed on first iteration */
32303277
}
32313278

3232-
/* Restore original event loop context before releasing GIL */
3233-
if (events_module != NULL) {
3234-
PyObject *restore = PyObject_CallMethod(events_module, "_set_running_loop", "O",
3235-
old_running_loop ? old_running_loop : Py_None);
3236-
Py_XDECREF(restore);
3237-
Py_XDECREF(old_running_loop);
3238-
Py_DECREF(events_module);
3239-
}
3279+
/* Restore original event loop context before releasing GIL.
3280+
* Note: events_module is cached, so we don't DECREF it here. */
3281+
PyObject *restore = PyObject_CallMethod(events_module, "_set_running_loop", "O",
3282+
old_running_loop ? old_running_loop : Py_None);
3283+
Py_XDECREF(restore);
3284+
Py_XDECREF(old_running_loop);
32403285

32413286
PyGILState_Release(gstate);
32423287

@@ -6946,6 +6991,7 @@ static PyObject *py_loop_new(PyObject *self, PyObject *args) {
69466991
/* Initialize Python cache (uvloop-style optimization) */
69476992
loop->cached_asyncio = NULL;
69486993
loop->cached_run_and_send = NULL;
6994+
loop->cached_events_module = NULL;
69496995
loop->py_cache_valid = false;
69506996

69516997
#ifdef HAVE_SUBINTERPRETERS

0 commit comments

Comments
 (0)