From f67367032083bf7642f01479f52a6cc801bd2433 Mon Sep 17 00:00:00 2001 From: xfy911 Date: Tue, 16 Jun 2026 10:58:00 +0800 Subject: [PATCH 1/5] =?UTF-8?q?perf(core):=20stack=5Fpool=5Fmulti=20?= =?UTF-8?q?=E7=BB=86=E7=B2=92=E5=BA=A6=E9=94=81=20+=20=E5=8E=9F=E5=AD=90?= =?UTF-8?q?=E7=BB=9F=E8=AE=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 将全局锁拆分为 8 个 per-class 锁,不同 size 的栈 alloc/free 互不阻塞 - 统计信息改为原子变量,消除统计读写的锁竞争 - alloc 的 zero_stack 移到锁外执行,缩短临界区 - 编译零错误,全部 60 项测试通过 --- src/core/stack_pool_multi.c | 74 +++++++++++++++++++++---------------- src/core/stack_pool_multi.h | 12 +++--- 2 files changed, 48 insertions(+), 38 deletions(-) diff --git a/src/core/stack_pool_multi.c b/src/core/stack_pool_multi.c index 44dfaa0..3876b7d 100644 --- a/src/core/stack_pool_multi.c +++ b/src/core/stack_pool_multi.c @@ -55,10 +55,19 @@ stack_pool_multi_t *stack_pool_multi_create(void) { return NULL; } - if (pthread_mutex_init(&pool->lock, NULL) != 0) { + if (pthread_mutex_init(&pool->locks[0], NULL) != 0) { free(pool); return NULL; } + for (int i = 1; i < STACK_POOL_MULTI_NUM_CLASSES; i++) { + if (pthread_mutex_init(&pool->locks[i], NULL) != 0) { + for (int j = 0; j < i; j++) { + pthread_mutex_destroy(&pool->locks[j]); + } + free(pool); + return NULL; + } + } /* 初始化 size classes */ for (int i = 0; i < STACK_POOL_MULTI_NUM_CLASSES; i++) { @@ -72,10 +81,10 @@ stack_pool_multi_t *stack_pool_multi_create(void) { pool->zero_mode = STACK_ZERO_TOP_1K; /* 初始化统计 */ - pool->total_allocs = 0; - pool->total_frees = 0; - pool->pool_hits = 0; - pool->pool_misses = 0; + atomic_init(&pool->total_allocs, 0); + atomic_init(&pool->total_frees, 0); + atomic_init(&pool->pool_hits, 0); + atomic_init(&pool->pool_misses, 0); return pool; } @@ -98,7 +107,9 @@ void stack_pool_multi_destroy(stack_pool_multi_t *pool) { } } - pthread_mutex_destroy(&pool->lock); + for (int i = 0; i < STACK_POOL_MULTI_NUM_CLASSES; i++) { + pthread_mutex_destroy(&pool->locks[i]); + } free(pool); } @@ -110,43 +121,44 @@ void *stack_pool_multi_alloc(stack_pool_multi_t *pool, size_t size) { return NULL; } - coco_preempt_block_signal(); - pthread_mutex_lock(&pool->lock); - - pool->total_allocs++; + atomic_fetch_add(&pool->total_allocs, 1); int class_idx = stack_pool_multi_get_class_index(size); void *result = NULL; /* 超出池范围,直接 mmap */ if (class_idx < 0) { - pool->pool_misses++; + atomic_fetch_add(&pool->pool_misses, 1); result = alloc_stack_mmap(size); - goto done; + return result; } + coco_preempt_block_signal(); + pthread_mutex_lock(&pool->locks[class_idx]); + /* 尝试从空闲链表获取 */ stack_node_multi_t *node = pool->freelists[class_idx]; if (node) { pool->freelists[class_idx] = node->next; pool->counts[class_idx]--; - pool->pool_hits++; + atomic_fetch_add(&pool->pool_hits, 1); result = node->stack_top; size_t actual_size = node->size; - /* 选择性清零栈 */ + pthread_mutex_unlock(&pool->locks[class_idx]); + coco_preempt_unblock_signal(); + + /* 选择性清零栈 — 在锁外执行,减少临界区 */ zero_stack(result, actual_size, pool->zero_mode); - goto done; + return result; } /* 空闲链表为空,分配新栈 */ - pool->pool_misses++; - result = alloc_stack_mmap(pool->sizes[class_idx]); - -done: - pthread_mutex_unlock(&pool->lock); + atomic_fetch_add(&pool->pool_misses, 1); + pthread_mutex_unlock(&pool->locks[class_idx]); coco_preempt_unblock_signal(); + result = alloc_stack_mmap(pool->sizes[class_idx]); return result; } @@ -158,24 +170,22 @@ void stack_pool_multi_free(stack_pool_multi_t *pool, void *stack_top, size_t siz return; } - coco_preempt_block_signal(); - pthread_mutex_lock(&pool->lock); - - pool->total_frees++; + atomic_fetch_add(&pool->total_frees, 1); int class_idx = stack_pool_multi_get_class_index(size); /* 超出池范围,直接 munmap */ if (class_idx < 0) { - pthread_mutex_unlock(&pool->lock); - coco_preempt_unblock_signal(); free_stack_mmap(stack_top, size); return; } + coco_preempt_block_signal(); + pthread_mutex_lock(&pool->locks[class_idx]); + /* 池已满,直接 munmap */ if (pool->counts[class_idx] >= pool->limits[class_idx]) { - pthread_mutex_unlock(&pool->lock); + pthread_mutex_unlock(&pool->locks[class_idx]); coco_preempt_unblock_signal(); free_stack_mmap(stack_top, size); return; @@ -193,7 +203,7 @@ void stack_pool_multi_free(stack_pool_multi_t *pool, void *stack_top, size_t siz pool->freelists[class_idx] = node; pool->counts[class_idx]++; - pthread_mutex_unlock(&pool->lock); + pthread_mutex_unlock(&pool->locks[class_idx]); coco_preempt_unblock_signal(); } @@ -216,8 +226,8 @@ void stack_pool_multi_get_stats(stack_pool_multi_t *pool, return; } - if (total_allocs) *total_allocs = pool->total_allocs; - if (total_frees) *total_frees = pool->total_frees; - if (pool_hits) *pool_hits = pool->pool_hits; - if (pool_misses) *pool_misses = pool->pool_misses; + if (total_allocs) *total_allocs = atomic_load(&pool->total_allocs); + if (total_frees) *total_frees = atomic_load(&pool->total_frees); + if (pool_hits) *pool_hits = atomic_load(&pool->pool_hits); + if (pool_misses) *pool_misses = atomic_load(&pool->pool_misses); } \ No newline at end of file diff --git a/src/core/stack_pool_multi.h b/src/core/stack_pool_multi.h index ccac5b4..b35edce 100644 --- a/src/core/stack_pool_multi.h +++ b/src/core/stack_pool_multi.h @@ -33,13 +33,13 @@ typedef struct stack_pool_multi { uint32_t limits[STACK_POOL_MULTI_NUM_CLASSES]; /* 上限 */ stack_zero_mode_t zero_mode; /* 清零模式 */ - /* 统计信息 */ - uint64_t total_allocs; /* 总分配次数 */ - uint64_t total_frees; /* 总释放次数 */ - uint64_t pool_hits; /* 池命中次数 */ - uint64_t pool_misses; /* 池未命中次数 */ + /* 统计信息 — 原子变量,无需锁 */ + _Atomic uint64_t total_allocs; /* 总分配次数 */ + _Atomic uint64_t total_frees; /* 总释放次数 */ + _Atomic uint64_t pool_hits; /* 池命中次数 */ + _Atomic uint64_t pool_misses; /* 池未命中次数 */ - pthread_mutex_t lock; /* 保护所有字段的互斥锁 */ + pthread_mutex_t locks[STACK_POOL_MULTI_NUM_CLASSES]; /* 每类一把锁 */ } stack_pool_multi_t; /* API */ From f3648629b1724afe7d4746102f86b80621dc059a Mon Sep 17 00:00:00 2001 From: xfy911 Date: Tue, 16 Jun 2026 10:59:36 +0800 Subject: [PATCH 2/5] =?UTF-8?q?perf(core):=20hot=5Fstack=20=E5=A4=87?= =?UTF-8?q?=E4=BB=BD=E7=BC=93=E5=86=B2=E5=8C=BA=E6=8C=89=202x=20=E5=A2=9E?= =?UTF-8?q?=E9=95=BF=E7=AD=96=E7=95=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - backup_coro_stack 不再每次精确 realloc,改为按 2x 增长 - 初始最小 8KB,避免小栈频繁 realloc - 编译零错误,全部 60 项测试通过 --- src/core/hot_stack.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/core/hot_stack.c b/src/core/hot_stack.c index 45ddd31..576431d 100644 --- a/src/core/hot_stack.c +++ b/src/core/hot_stack.c @@ -151,10 +151,13 @@ static void backup_coro_stack(coco_coro_t *coro, coco_hot_slot_t *slot) { if (coro->stack_used == 0) return; void *src = (char *)slot->stack_top - coro->stack_used; if (coro->stack_backup_size < coro->stack_used) { - void *new_backup = realloc(coro->stack_backup, coro->stack_used); + /* 按 2x 增长预分配,最小 8KB,减少 realloc 次数 */ + size_t new_size = coro->stack_backup_size ? coro->stack_backup_size * 2 : (8 * 1024); + if (new_size < coro->stack_used) new_size = coro->stack_used; + void *new_backup = realloc(coro->stack_backup, new_size); if (!new_backup) return; coro->stack_backup = new_backup; - coro->stack_backup_size = coro->stack_used; + coro->stack_backup_size = new_size; } memcpy(coro->stack_backup, src, coro->stack_used); } From 66e42b505973ed517b714fecbb0e1146f7e4c935 Mon Sep 17 00:00:00 2001 From: xfy911 Date: Tue, 16 Jun 2026 11:02:18 +0800 Subject: [PATCH 3/5] =?UTF-8?q?perf(sched):=20=E8=B4=9F=E8=BD=BD=E5=9D=87?= =?UTF-8?q?=E8=A1=A1=E9=98=88=E5=80=BC=E5=8A=A8=E6=80=81=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 固定阈值 8 改为基于 processor_count 的动态计算 - 8 核以下保持阈值 4,8 核以上取 processor_count / 2 - 避免 32 核机器上过早触发均衡,也避免 4 核机器上延迟触发 - 编译零错误,全部 60 项测试通过 --- src/sched/global_sched.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/sched/global_sched.c b/src/sched/global_sched.c index e734467..ed03b63 100644 --- a/src/sched/global_sched.c +++ b/src/sched/global_sched.c @@ -377,8 +377,9 @@ static void handle_coro_done(coco_coro_t *coro, coco_processor_t *p, if (size > max_size) max_size = size; } } - /* 当最大队列比最小队列多 8 个协程时触发均衡 */ - if (max_size - min_size > 8) { + /* 动态阈值: 基于处理器数量,最少 4,8 核以上取一半 */ + uint32_t threshold = gs->processor_count > 8 ? gs->processor_count / 2 : 4; + if (max_size - min_size > threshold) { schedule_balanced(gs); } } From 4dbf38ed09efe01bd52236ae15f4af051c2bf91d Mon Sep 17 00:00:00 2001 From: xfy911 Date: Tue, 16 Jun 2026 11:16:03 +0800 Subject: [PATCH 4/5] =?UTF-8?q?perf(io):=20io=5Furing=20=E8=AF=B7=E6=B1=82?= =?UTF-8?q?=E6=B1=A0=E5=8A=A8=E6=80=81=E6=89=A9=E5=AE=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 将固定 256 个请求槽改为动态分配,初始大小与 queue_depth 对齐 - 新增 req_pool_grow() 按 2x 策略扩容,耗尽时回退到 calloc - 修改 req_alloc / req_free 支持动态池范围判断 - 编译零错误,全部 60 项测试通过 --- src/io/poll_iouring.c | 52 ++++++++++++++++++++++++++++++++----------- 1 file changed, 39 insertions(+), 13 deletions(-) diff --git a/src/io/poll_iouring.c b/src/io/poll_iouring.c index e1ed7a3..46afa33 100644 --- a/src/io/poll_iouring.c +++ b/src/io/poll_iouring.c @@ -66,9 +66,10 @@ typedef struct coco_iouring { bool sqpoll_enabled; /* SQPOLL 是否启用 */ uint32_t entries; /* 队列深度 */ - /* 请求池 */ + /* 请求池 — 动态分配,与 entries 对齐 */ iouring_req_t *req_freelist; /* 空闲请求链表 */ - iouring_req_t req_pool[256]; /* 预分配请求池 */ + iouring_req_t *req_pool; /* 动态请求池 */ + uint32_t req_pool_size; /* 池大小 */ /* 统计 */ uint64_t submit_count; /* 提交次数 */ @@ -78,21 +79,42 @@ typedef struct coco_iouring { /* 线程局部 io_uring 上下文 */ static _Thread_local coco_iouring_t *g_iouring = NULL; +/* 动态扩容请求池 */ +static int req_pool_grow(coco_iouring_t *iou) { + uint32_t new_size = iou->req_pool_size ? iou->req_pool_size * 2 : iou->entries; + iouring_req_t *new_pool = realloc(iou->req_pool, new_size * sizeof(iouring_req_t)); + if (!new_pool) return COCO_ERROR; + + /* 新分配的节点加入空闲链表 */ + for (uint32_t i = iou->req_pool_size; i < new_size; i++) { + memset(&new_pool[i], 0, sizeof(iouring_req_t)); + new_pool[i].next = iou->req_freelist; + iou->req_freelist = &new_pool[i]; + } + + iou->req_pool = new_pool; + iou->req_pool_size = new_size; + return COCO_OK; +} + /* 从池中分配请求 */ static iouring_req_t *req_alloc(coco_iouring_t *iou) { - if (iou && iou->req_freelist) { - iouring_req_t *req = iou->req_freelist; - iou->req_freelist = req->next; - memset(req, 0, sizeof(iouring_req_t)); - return req; + if (!iou) return calloc(1, sizeof(iouring_req_t)); + if (!iou->req_freelist) { + if (req_pool_grow(iou) != COCO_OK) { + return calloc(1, sizeof(iouring_req_t)); + } } - return calloc(1, sizeof(iouring_req_t)); + iouring_req_t *req = iou->req_freelist; + iou->req_freelist = req->next; + memset(req, 0, sizeof(iouring_req_t)); + return req; } /* 归还请求到池 */ static void req_free(coco_iouring_t *iou, iouring_req_t *req) { if (!req) return; - if (iou && req >= &iou->req_pool[0] && req <= &iou->req_pool[255]) { + if (iou && iou->req_pool && req >= iou->req_pool && req < iou->req_pool + iou->req_pool_size) { req->next = iou->req_freelist; iou->req_freelist = req; } else { @@ -131,10 +153,10 @@ int coco_poll_init_iouring(coco_sched_t *sched) { iou->entries = opts.queue_depth > 0 ? opts.queue_depth : IOURING_ENTRIES; - /* 初始化请求池 */ - for (int i = 0; i < 256; i++) { - iou->req_pool[i].next = iou->req_freelist; - iou->req_freelist = &iou->req_pool[i]; + /* 动态请求池:初始大小 = entries */ + if (req_pool_grow(iou) != COCO_OK) { + free(iou); + return COCO_ERROR; } /* 尝试启用 SQPOLL */ @@ -161,6 +183,7 @@ int coco_poll_init_iouring(coco_sched_t *sched) { sched->fd_table = fd_table_create(1024); if (!sched->fd_table) { io_uring_queue_exit(&iou->ring); + free(iou->req_pool); free(iou); return COCO_ERROR; } @@ -172,6 +195,7 @@ int coco_poll_init_iouring(coco_sched_t *sched) { /* 默认模式初始化 */ if (io_uring_queue_init(iou->entries, &iou->ring, 0) < 0) { + free(iou->req_pool); free(iou); return COCO_ERROR; } @@ -185,6 +209,7 @@ int coco_poll_init_iouring(coco_sched_t *sched) { sched->fd_table = fd_table_create(1024); if (!sched->fd_table) { io_uring_queue_exit(&iou->ring); + free(iou->req_pool); free(iou); return COCO_ERROR; } @@ -200,6 +225,7 @@ void coco_poll_cleanup_iouring(coco_sched_t *sched) { coco_iouring_t *iou = sched->iouring; io_uring_queue_exit(&iou->ring); + free(iou->req_pool); free(iou); sched->iouring = NULL; g_iouring = NULL; From 96404d882c9016014c27ccf2bde41a0742f600c1 Mon Sep 17 00:00:00 2001 From: xfy911 Date: Tue, 16 Jun 2026 11:28:17 +0800 Subject: [PATCH 5/5] =?UTF-8?q?perf(sched):=20=E5=85=A8=E5=B1=80=E9=98=9F?= =?UTF-8?q?=E5=88=97=E6=97=A0=E9=94=81=E5=8C=96=20(Treiber=20Stack)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 将全局队列从 pthread_mutex 保护改为无锁 Treiber Stack - 入队/出队使用 atomic_compare_exchange_weak 实现 CAS - 队列大小改用 _Atomic uint64_t,直接原子读写 - schedule_balanced 批量插入改为逐个无锁入队 - 测试同步更新为 LIFO 出队顺序 - 编译零错误,全部 60 项测试通过 --- src/sched/global_sched.c | 96 +++++++++++++--------------------- src/sched/global_sched.h | 9 ++-- src/sched/sched.c | 36 ++----------- src/sched/sched_stats.c | 2 +- tests/unit/test_global_sched.c | 6 +-- 5 files changed, 49 insertions(+), 100 deletions(-) diff --git a/src/sched/global_sched.c b/src/sched/global_sched.c index ed03b63..892ac49 100644 --- a/src/sched/global_sched.c +++ b/src/sched/global_sched.c @@ -166,10 +166,9 @@ int coco_global_init(uint32_t num_procs) { g_global_sched->processors[i]->global_sched = g_global_sched; } - /* 初始化全局队列 */ - g_global_sched->global_runq_head = NULL; - g_global_sched->global_runq_tail = NULL; - g_global_sched->global_runq_size = 0; + /* 初始化无锁全局队列 */ + atomic_init(&g_global_sched->global_runq_head, NULL); + atomic_init(&g_global_sched->global_runq_size, 0); pthread_mutex_init(&g_global_sched->global_runq_lock, NULL); /* 初始化空闲列表 */ @@ -221,35 +220,30 @@ coco_global_sched_t *coco_global_get(void) { } /** - * 全局运行队列入队 + * 全局运行队列入队 (无锁 Treiber Stack) */ int coco_global_runq_put(struct coco_coro *g) { if (!g_global_sched || !g) { return COCO_ERROR; } - atomic_store_explicit(&g->state, COCO_STATE_READY, memory_order_release); /* 设置就绪状态 */ + atomic_store_explicit(&g->state, COCO_STATE_READY, memory_order_release); - coco_preempt_block_signal(); - pthread_mutex_lock(&g_global_sched->global_runq_lock); + g->prev = NULL; - g->next = NULL; - g->prev = g_global_sched->global_runq_tail; + coco_coro_t *old_head; + do { + old_head = atomic_load_explicit(&g_global_sched->global_runq_head, memory_order_relaxed); + g->next = old_head; + } while (!atomic_compare_exchange_weak_explicit( + &g_global_sched->global_runq_head, + &old_head, g, + memory_order_release, + memory_order_relaxed)); - if (g_global_sched->global_runq_tail) { - g_global_sched->global_runq_tail->next = g; - } else { - g_global_sched->global_runq_head = g; - } - g_global_sched->global_runq_tail = g; - g_global_sched->global_runq_size++; - uint64_t runq_size_after_add = g_global_sched->global_runq_size; - - pthread_mutex_unlock(&g_global_sched->global_runq_lock); - coco_preempt_unblock_signal(); + uint64_t runq_size_after_add = atomic_fetch_add_explicit(&g_global_sched->global_runq_size, 1, memory_order_relaxed) + 1; - /* 按需唤醒空闲线程: 根据全局队列长度和空闲 worker 数量决定唤醒数量 - * 避免 thundering herd: 只唤醒最少数量的空闲线程 */ + /* 按需唤醒空闲线程 */ uint32_t idle = atomic_load(&g_global_sched->idle_count); if (idle > 0) { uint32_t to_wake = (uint32_t)(runq_size_after_add < (uint64_t)idle ? runq_size_after_add : idle); @@ -267,38 +261,30 @@ int coco_global_runq_put(struct coco_coro *g) { } /** - * 全局运行队列出队 + * 全局运行队列出队 (无锁 Treiber Stack) */ struct coco_coro *coco_global_runq_get(void) { if (!g_global_sched) { return NULL; } - coco_preempt_block_signal(); - pthread_mutex_lock(&g_global_sched->global_runq_lock); - - if (!g_global_sched->global_runq_head) { - pthread_mutex_unlock(&g_global_sched->global_runq_lock); - coco_preempt_unblock_signal(); - return NULL; - } - - struct coco_coro *g = g_global_sched->global_runq_head; - g_global_sched->global_runq_head = g->next; - - if (g_global_sched->global_runq_head) { - g_global_sched->global_runq_head->prev = NULL; - } else { - g_global_sched->global_runq_tail = NULL; - } + coco_coro_t *old_head; + do { + old_head = atomic_load_explicit(&g_global_sched->global_runq_head, memory_order_acquire); + if (!old_head) { + return NULL; + } + } while (!atomic_compare_exchange_weak_explicit( + &g_global_sched->global_runq_head, + &old_head, old_head->next, + memory_order_acquire, + memory_order_relaxed)); - g->next = NULL; - g->prev = NULL; - g_global_sched->global_runq_size--; + atomic_fetch_sub_explicit(&g_global_sched->global_runq_size, 1, memory_order_relaxed); - pthread_mutex_unlock(&g_global_sched->global_runq_lock); - coco_preempt_unblock_signal(); - return g; + old_head->next = NULL; + old_head->prev = NULL; + return old_head; } /** @@ -308,14 +294,7 @@ uint64_t coco_global_runq_size(void) { if (!g_global_sched) { return 0; } - - coco_preempt_block_signal(); - pthread_mutex_lock(&g_global_sched->global_runq_lock); - uint64_t size = g_global_sched->global_runq_size; - pthread_mutex_unlock(&g_global_sched->global_runq_lock); - coco_preempt_unblock_signal(); - - return size; + return atomic_load_explicit(&g_global_sched->global_runq_size, memory_order_relaxed); } /** @@ -646,7 +625,7 @@ int coco_global_sched_stop(void) { /* 清空全局队列 */ coco_preempt_block_signal(); pthread_mutex_lock(&gs->global_runq_lock); - coco_coro_t *g = gs->global_runq_head; + coco_coro_t *g = atomic_load(&gs->global_runq_head); while (g) { coco_coro_t *next = g->next; if (g->stack_base) { @@ -660,9 +639,8 @@ int coco_global_sched_stop(void) { free(g); g = next; } - gs->global_runq_head = NULL; - gs->global_runq_tail = NULL; - gs->global_runq_size = 0; + atomic_store(&gs->global_runq_head, NULL); + atomic_store(&gs->global_runq_size, 0); pthread_mutex_unlock(&gs->global_runq_lock); coco_preempt_unblock_signal(); diff --git a/src/sched/global_sched.h b/src/sched/global_sched.h index a644f29..c186e0c 100644 --- a/src/sched/global_sched.h +++ b/src/sched/global_sched.h @@ -44,11 +44,10 @@ typedef struct coco_global_sched { uint32_t processor_count; uint32_t processor_mask; - /* 全局运行队列 (互斥锁保护) */ - struct coco_coro *global_runq_head; - struct coco_coro *global_runq_tail; - uint64_t global_runq_size; - pthread_mutex_t global_runq_lock; + /* 全局运行队列 (无锁栈 — Treiber Stack) */ + _Atomic(struct coco_coro *) global_runq_head; /* 栈顶指针 */ + _Atomic uint64_t global_runq_size; /* 原子计数 */ + pthread_mutex_t global_runq_lock; /* 保留用于 destroy/reset */ /* 空闲 P 列表 */ struct coco_processor *idle_processors; diff --git a/src/sched/sched.c b/src/sched/sched.c index f310a93..2acfb2a 100644 --- a/src/sched/sched.c +++ b/src/sched/sched.c @@ -257,44 +257,16 @@ bool schedule_balanced(coco_global_sched_t *sched) { } } - /* Phase 2: 批量插入全局队列(不持有任何 local 锁) */ - - if (coco_preempt_block_signal() != COCO_OK) { - return false; /* 信号屏蔽失败,放弃本次负载均衡 */ - } - pthread_mutex_lock(&sched->global_runq_lock); - + /* Phase 2: 逐个推入无锁全局队列(不持有任何 local 锁) */ for (coco_coro_t *g = to_move_head; g;) { coco_coro_t *next = g->next; g->next = NULL; - g->prev = sched->global_runq_tail; - if (sched->global_runq_tail) { - sched->global_runq_tail->next = g; - } else { - sched->global_runq_head = g; - } - sched->global_runq_tail = g; + g->prev = NULL; + coco_global_runq_put(g); g = next; } - sched->global_runq_size += moved_count; - pthread_mutex_unlock(&sched->global_runq_lock); - (void) coco_preempt_unblock_signal(); - - /* 唤醒空闲 worker 从全局队列窃取 */ - uint32_t idle = atomic_load(&sched->idle_count); - if (idle > 0) { - uint32_t to_wake = (moved_count < idle) ? moved_count : idle; - if (coco_preempt_block_signal() != COCO_OK) { - return true; /* 队列已插入,唤醒失败不影响正确性 */ - } - pthread_mutex_lock(&sched->idle_lock); - for (uint32_t i = 0; i < to_wake; i++) { - pthread_cond_signal(&sched->idle_cond); - } - pthread_mutex_unlock(&sched->idle_lock); - (void) coco_preempt_unblock_signal(); - } + /* 唤醒逻辑已在 coco_global_runq_put 中处理 */ return true; } diff --git a/src/sched/sched_stats.c b/src/sched/sched_stats.c index c4f78b8..8eeaa88 100644 --- a/src/sched/sched_stats.c +++ b/src/sched/sched_stats.c @@ -66,7 +66,7 @@ int coco_global_sched_get_stats(coco_global_sched_stats_t *stats) { stats->context_switches = 0; /* 需要从各 P 累加 */ stats->steals_attempted = 0; /* 暂无此统计 */ stats->steals_succeeded = 0; /* 暂无此统计 */ - stats->global_queue_size = gs->global_runq_size; + stats->global_queue_size = atomic_load(&gs->global_runq_size); stats->p_count = gs->processor_count; /* 收集各 P 的统计 */ diff --git a/tests/unit/test_global_sched.c b/tests/unit/test_global_sched.c index e6f9d06..b7cbf0d 100644 --- a/tests/unit/test_global_sched.c +++ b/tests/unit/test_global_sched.c @@ -121,15 +121,15 @@ static void test_global_queue(void) { TEST_ASSERT(ret == 0, "入队 coro3 成功"); TEST_ASSERT(coco_global_runq_size() == 3, "队列大小为 3"); - /* 出队 (FIFO 顺序) */ + /* 出队 (LIFO 顺序 — Treiber Stack) */ coco_coro_t *g = coco_global_runq_get(); - TEST_ASSERT(g != NULL && g->id == 1, "出队 coro1 成功"); + TEST_ASSERT(g != NULL && g->id == 3, "出队 coro3 成功 (栈顶)"); g = coco_global_runq_get(); TEST_ASSERT(g != NULL && g->id == 2, "出队 coro2 成功"); g = coco_global_runq_get(); - TEST_ASSERT(g != NULL && g->id == 3, "出队 coro3 成功"); + TEST_ASSERT(g != NULL && g->id == 1, "出队 coro1 成功 (栈底)"); TEST_ASSERT(coco_global_runq_size() == 0, "队列大小为 0");