Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/core/hot_stack.c
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,13 @@ static void backup_coro_stack(coco_coro_t *coro, coco_hot_slot_t *slot) {
if (coro->stack_used == 0) return;
void *src = (char *)slot->stack_top - coro->stack_used;
if (coro->stack_backup_size < coro->stack_used) {
void *new_backup = realloc(coro->stack_backup, coro->stack_used);
/* 按 2x 增长预分配,最小 8KB,减少 realloc 次数 */
size_t new_size = coro->stack_backup_size ? coro->stack_backup_size * 2 : (8 * 1024);
if (new_size < coro->stack_used) new_size = coro->stack_used;
void *new_backup = realloc(coro->stack_backup, new_size);
if (!new_backup) return;
coro->stack_backup = new_backup;
coro->stack_backup_size = coro->stack_used;
coro->stack_backup_size = new_size;
}
memcpy(coro->stack_backup, src, coro->stack_used);
}
Expand Down
115 changes: 61 additions & 54 deletions src/core/stack_pool_multi.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@ static void flush_tl_cache_to_pool(stack_pool_multi_t *pool, bool free_all) {

/* 合并快速路径释放计数到全局统计 */
if (tl_fast_frees > 0) {
pthread_mutex_lock(&pool->lock);
pool->total_frees += tl_fast_frees;
pthread_mutex_unlock(&pool->lock);
atomic_fetch_add(&pool->total_frees, tl_fast_frees);
tl_fast_frees = 0;
}

Expand All @@ -77,21 +75,25 @@ static void flush_tl_cache_to_pool(stack_pool_multi_t *pool, bool free_all) {
if (free_all) {
free_stack_mmap(stack, pool->sizes[i]);
} else {
/* 尝试放回池中(带锁) */
pthread_mutex_lock(&pool->lock);
if (pool->counts[i] < pool->limits[i]) {
/* 尝试放回池中(带 per-class 锁) */
int class_idx = i;
coco_preempt_block_signal();
pthread_mutex_lock(&pool->locks[class_idx]);
if (pool->counts[class_idx] < pool->limits[class_idx]) {
size_t page_size = get_page_size();
size_t actual_size = pool->sizes[i];
size_t actual_size = pool->sizes[class_idx];
void *stack_base = (void*)((uintptr_t)stack - actual_size - page_size);
stack_node_multi_t *node = (stack_node_multi_t*)((uintptr_t)stack_base + page_size);
node->stack_top = stack;
node->size = actual_size;
node->next = pool->freelists[i];
pool->freelists[i] = node;
pool->counts[i]++;
pthread_mutex_unlock(&pool->lock);
node->next = pool->freelists[class_idx];
pool->freelists[class_idx] = node;
pool->counts[class_idx]++;
pthread_mutex_unlock(&pool->locks[class_idx]);
coco_preempt_unblock_signal();
} else {
pthread_mutex_unlock(&pool->lock);
pthread_mutex_unlock(&pool->locks[class_idx]);
coco_preempt_unblock_signal();
free_stack_mmap(stack, pool->sizes[i]);
}
}
Expand All @@ -108,33 +110,35 @@ static void *stack_pool_multi_alloc_fast(stack_pool_multi_t *pool, size_t size)
void *stack = tl_stack_cache.cache[class_idx];
tl_stack_cache.cache[class_idx] = NULL;
zero_stack(stack, pool->sizes[class_idx], pool->zero_mode);
atomic_fetch_add(&pool->total_allocs, 1);
atomic_fetch_add(&pool->pool_hits, 1);
return stack;
}
return NULL;
}

/**
* 慢速释放路径(带锁,回收到全局池或直接 munmap)
* 慢速释放路径(带 per-class 锁,回收到全局池或直接 munmap)
*/
static void stack_pool_multi_free_slow(stack_pool_multi_t *pool, void *stack_top, size_t size) {
coco_preempt_block_signal();
pthread_mutex_lock(&pool->lock);

pool->total_frees++;

int class_idx = stack_pool_multi_get_class_index(size);

/* 超出池范围,直接 munmap */
if (class_idx < 0) {
pthread_mutex_unlock(&pool->lock);
coco_preempt_unblock_signal();
free_stack_mmap(stack_top, size);
return;
}

pthread_mutex_lock(&pool->locks[class_idx]);

atomic_fetch_add(&pool->total_frees, 1);

/* 池已满,直接 munmap */
if (pool->counts[class_idx] >= pool->limits[class_idx]) {
pthread_mutex_unlock(&pool->lock);
pthread_mutex_unlock(&pool->locks[class_idx]);
coco_preempt_unblock_signal();
free_stack_mmap(stack_top, size);
return;
Expand All @@ -152,7 +156,7 @@ static void stack_pool_multi_free_slow(stack_pool_multi_t *pool, void *stack_top
pool->freelists[class_idx] = node;
pool->counts[class_idx]++;

pthread_mutex_unlock(&pool->lock);
pthread_mutex_unlock(&pool->locks[class_idx]);
coco_preempt_unblock_signal();
}

Expand All @@ -179,9 +183,14 @@ stack_pool_multi_t *stack_pool_multi_create(void) {
return NULL;
}

if (pthread_mutex_init(&pool->lock, NULL) != 0) {
free(pool);
return NULL;
for (int i = 0; i < STACK_POOL_MULTI_NUM_CLASSES; i++) {
if (pthread_mutex_init(&pool->locks[i], NULL) != 0) {
for (int j = 0; j < i; j++) {
pthread_mutex_destroy(&pool->locks[j]);
}
free(pool);
return NULL;
}
}

/* 初始化 size classes */
Expand All @@ -195,11 +204,11 @@ stack_pool_multi_t *stack_pool_multi_create(void) {
/* 默认使用栈顶 1KB 清零模式 */
pool->zero_mode = STACK_ZERO_TOP_1K;

/* 初始化统计 */
pool->total_allocs = 0;
pool->total_frees = 0;
pool->pool_hits = 0;
pool->pool_misses = 0;
/* 初始化统计 — 原子变量 */
atomic_init(&pool->total_allocs, 0);
atomic_init(&pool->total_frees, 0);
atomic_init(&pool->pool_hits, 0);
atomic_init(&pool->pool_misses, 0);

return pool;
}
Expand All @@ -225,12 +234,14 @@ void stack_pool_multi_destroy(stack_pool_multi_t *pool) {
}
}

pthread_mutex_destroy(&pool->lock);
for (int i = 0; i < STACK_POOL_MULTI_NUM_CLASSES; i++) {
pthread_mutex_destroy(&pool->locks[i]);
}
free(pool);
}

/**
* 从栈池分配栈(优先走快速路径)
* 从栈池分配栈(优先走快速路径,再走细粒度锁路径
*/
void *stack_pool_multi_alloc(stack_pool_multi_t *pool, size_t size) {
if (!pool) {
Expand All @@ -240,52 +251,48 @@ void *stack_pool_multi_alloc(stack_pool_multi_t *pool, size_t size) {
/* 快速路径:线程局部缓存 */
void *fast = stack_pool_multi_alloc_fast(pool, size);
if (fast) {
coco_preempt_block_signal();
pthread_mutex_lock(&pool->lock);
pool->total_allocs++;
pool->pool_hits++;
pthread_mutex_unlock(&pool->lock);
coco_preempt_unblock_signal();
return fast;
}

coco_preempt_block_signal();
pthread_mutex_lock(&pool->lock);

pool->total_allocs++;
/* 慢速路径:细粒度锁 */
atomic_fetch_add(&pool->total_allocs, 1);

int class_idx = stack_pool_multi_get_class_index(size);
void *result = NULL;

/* 超出池范围,直接 mmap */
if (class_idx < 0) {
pool->pool_misses++;
atomic_fetch_add(&pool->pool_misses, 1);
result = alloc_stack_mmap(size);
goto done;
return result;
}

coco_preempt_block_signal();
pthread_mutex_lock(&pool->locks[class_idx]);

/* 尝试从空闲链表获取 */
stack_node_multi_t *node = pool->freelists[class_idx];
if (node) {
pool->freelists[class_idx] = node->next;
pool->counts[class_idx]--;
pool->pool_hits++;
atomic_fetch_add(&pool->pool_hits, 1);

result = node->stack_top;
size_t actual_size = node->size;

/* 选择性清零栈 */
pthread_mutex_unlock(&pool->locks[class_idx]);
coco_preempt_unblock_signal();

/* 选择性清零栈 — 在锁外执行,减少临界区 */
zero_stack(result, actual_size, pool->zero_mode);
goto done;
return result;
}

/* 空闲链表为空,分配新栈 */
pool->pool_misses++;
result = alloc_stack_mmap(pool->sizes[class_idx]);

done:
pthread_mutex_unlock(&pool->lock);
atomic_fetch_add(&pool->pool_misses, 1);
pthread_mutex_unlock(&pool->locks[class_idx]);
coco_preempt_unblock_signal();
result = alloc_stack_mmap(pool->sizes[class_idx]);
return result;
}

Expand Down Expand Up @@ -322,8 +329,8 @@ void stack_pool_multi_get_stats(stack_pool_multi_t *pool,
/* 刷新当前线程缓存以确保统计准确 */
flush_tl_cache_to_pool(pool, false);

if (total_allocs) *total_allocs = pool->total_allocs;
if (total_frees) *total_frees = pool->total_frees;
if (pool_hits) *pool_hits = pool->pool_hits;
if (pool_misses) *pool_misses = pool->pool_misses;
}
if (total_allocs) *total_allocs = atomic_load(&pool->total_allocs);
if (total_frees) *total_frees = atomic_load(&pool->total_frees);
if (pool_hits) *pool_hits = atomic_load(&pool->pool_hits);
if (pool_misses) *pool_misses = atomic_load(&pool->pool_misses);
}
12 changes: 6 additions & 6 deletions src/core/stack_pool_multi.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ typedef struct stack_pool_multi {
uint32_t limits[STACK_POOL_MULTI_NUM_CLASSES]; /* 上限 */
stack_zero_mode_t zero_mode; /* 清零模式 */

/* 统计信息 */
uint64_t total_allocs; /* 总分配次数 */
uint64_t total_frees; /* 总释放次数 */
uint64_t pool_hits; /* 池命中次数 */
uint64_t pool_misses; /* 池未命中次数 */
/* 统计信息 — 原子变量,无需锁 */
_Atomic uint64_t total_allocs; /* 总分配次数 */
_Atomic uint64_t total_frees; /* 总释放次数 */
_Atomic uint64_t pool_hits; /* 池命中次数 */
_Atomic uint64_t pool_misses; /* 池未命中次数 */

pthread_mutex_t lock; /* 保护所有字段的互斥锁 */
pthread_mutex_t locks[STACK_POOL_MULTI_NUM_CLASSES]; /* 每类一把锁 */
} stack_pool_multi_t;

/* API */
Expand Down
52 changes: 39 additions & 13 deletions src/io/poll_iouring.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ typedef struct coco_iouring {
bool sqpoll_enabled; /* SQPOLL 是否启用 */
uint32_t entries; /* 队列深度 */

/* 请求池 */
/* 请求池 — 动态分配,与 entries 对齐 */
iouring_req_t *req_freelist; /* 空闲请求链表 */
iouring_req_t req_pool[256]; /* 预分配请求池 */
iouring_req_t *req_pool; /* 动态请求池 */
uint32_t req_pool_size; /* 池大小 */

/* 统计 */
uint64_t submit_count; /* 提交次数 */
Expand All @@ -78,21 +79,42 @@ typedef struct coco_iouring {
/* 线程局部 io_uring 上下文 */
static _Thread_local coco_iouring_t *g_iouring = NULL;

/* 动态扩容请求池 */
static int req_pool_grow(coco_iouring_t *iou) {
uint32_t new_size = iou->req_pool_size ? iou->req_pool_size * 2 : iou->entries;
iouring_req_t *new_pool = realloc(iou->req_pool, new_size * sizeof(iouring_req_t));
if (!new_pool) return COCO_ERROR;

/* 新分配的节点加入空闲链表 */
for (uint32_t i = iou->req_pool_size; i < new_size; i++) {
memset(&new_pool[i], 0, sizeof(iouring_req_t));
new_pool[i].next = iou->req_freelist;
iou->req_freelist = &new_pool[i];
}

iou->req_pool = new_pool;
iou->req_pool_size = new_size;
return COCO_OK;
}

/* 从池中分配请求 */
static iouring_req_t *req_alloc(coco_iouring_t *iou) {
if (iou && iou->req_freelist) {
iouring_req_t *req = iou->req_freelist;
iou->req_freelist = req->next;
memset(req, 0, sizeof(iouring_req_t));
return req;
if (!iou) return calloc(1, sizeof(iouring_req_t));
if (!iou->req_freelist) {
if (req_pool_grow(iou) != COCO_OK) {
return calloc(1, sizeof(iouring_req_t));
}
}
return calloc(1, sizeof(iouring_req_t));
iouring_req_t *req = iou->req_freelist;
iou->req_freelist = req->next;
memset(req, 0, sizeof(iouring_req_t));
return req;
}

/* 归还请求到池 */
static void req_free(coco_iouring_t *iou, iouring_req_t *req) {
if (!req) return;
if (iou && req >= &iou->req_pool[0] && req <= &iou->req_pool[255]) {
if (iou && iou->req_pool && req >= iou->req_pool && req < iou->req_pool + iou->req_pool_size) {
req->next = iou->req_freelist;
iou->req_freelist = req;
} else {
Expand Down Expand Up @@ -131,10 +153,10 @@ int coco_poll_init_iouring(coco_sched_t *sched) {

iou->entries = opts.queue_depth > 0 ? opts.queue_depth : IOURING_ENTRIES;

/* 初始化请求池 */
for (int i = 0; i < 256; i++) {
iou->req_pool[i].next = iou->req_freelist;
iou->req_freelist = &iou->req_pool[i];
/* 动态请求池:初始大小 = entries */
if (req_pool_grow(iou) != COCO_OK) {
free(iou);
return COCO_ERROR;
}

/* 尝试启用 SQPOLL */
Expand All @@ -161,6 +183,7 @@ int coco_poll_init_iouring(coco_sched_t *sched) {
sched->fd_table = fd_table_create(1024);
if (!sched->fd_table) {
io_uring_queue_exit(&iou->ring);
free(iou->req_pool);
free(iou);
return COCO_ERROR;
}
Expand All @@ -172,6 +195,7 @@ int coco_poll_init_iouring(coco_sched_t *sched) {

/* 默认模式初始化 */
if (io_uring_queue_init(iou->entries, &iou->ring, 0) < 0) {
free(iou->req_pool);
free(iou);
return COCO_ERROR;
}
Expand All @@ -185,6 +209,7 @@ int coco_poll_init_iouring(coco_sched_t *sched) {
sched->fd_table = fd_table_create(1024);
if (!sched->fd_table) {
io_uring_queue_exit(&iou->ring);
free(iou->req_pool);
free(iou);
return COCO_ERROR;
}
Expand All @@ -200,6 +225,7 @@ void coco_poll_cleanup_iouring(coco_sched_t *sched) {

coco_iouring_t *iou = sched->iouring;
io_uring_queue_exit(&iou->ring);
free(iou->req_pool);
free(iou);
sched->iouring = NULL;
g_iouring = NULL;
Expand Down
Loading
Loading