Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ anyhow.workspace = true
slab.workspace = true
backtrace.workspace = true

[target.'cfg(unix)'.dev-dependencies]
libc.workspace = true

[features]
default = ["log", "syscall"]

Expand Down
60 changes: 17 additions & 43 deletions core/src/syscall/windows/WaitOnAddress.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
use std::ffi::{c_uint, c_void};
use std::time::Duration;
use windows_sys::core::BOOL;
use windows_sys::Win32::Foundation::{ERROR_TIMEOUT, FALSE, TRUE};
use crate::common::{get_timeout_time, now};
use crate::net::EventLoops;
use crate::syscall::reset_errno;
use crate::syscall::set_errno;

trait WaitOnAddressSyscall {
extern "system" fn WaitOnAddress(
Expand Down Expand Up @@ -51,43 +45,23 @@ impl<I: WaitOnAddressSyscall> WaitOnAddressSyscall for NioWaitOnAddressSyscall<I
addresssize: usize,
dwmilliseconds: c_uint
) -> BOOL {
let timeout = get_timeout_time(Duration::from_millis(dwmilliseconds.into()));
loop {
let mut left_time = timeout.saturating_sub(now());
if 0 == left_time {
set_errno(ERROR_TIMEOUT);
return FALSE;
}
let r = self.inner.WaitOnAddress(
fn_ptr,
address,
compareaddress,
addresssize,
(left_time / 1_000_000).min(1).try_into().expect("overflow"),
);
if TRUE == r {
reset_errno();
return r;
}
left_time = timeout.saturating_sub(now());
if 0 == left_time {
set_errno(ERROR_TIMEOUT);
return FALSE;
}
let wait_time = if left_time > 10_000_000 {
10_000_000
} else {
left_time
};
if EventLoops::wait_event(Some(Duration::new(
wait_time / 1_000_000_000,
(wait_time % 1_000_000_000) as _,
)))
.is_err()
{
return r;
}
}
// Delegate directly to the real WaitOnAddress without any NIO polling loop.
//
// A NIO loop (poll every 1 ms, yield via EventLoops::wait_event for 10 ms) was
// tried, but it caused two distinct problems on Windows:
//
// 1. Recursion: EventLoops::wait_event accesses DashMap/parking_lot internals which
// call WaitOnAddress, creating an infinite recursion chain that stack-overflows.
//
// 2. Excessive overhead: on nightly Windows, std uses WaitOnAddress for many
// internal mutex operations (Mutex, Condvar, Arc, channels …). Each call from
// within a coroutine incurred an ~11 ms overhead, causing the socket_co_server
// integration test to exceed its 30 s timeout.
//
// Passing through directly avoids both issues. Any WaitOnAddress call from within
// a coroutine simply blocks the event-loop thread for its natural duration, which is
// acceptable because the durations in practice are very short (µs range).
self.inner.WaitOnAddress(fn_ptr, address, compareaddress, addresssize, dwmilliseconds)
}
}

Expand Down
34 changes: 34 additions & 0 deletions core/tests/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,40 @@ fn scheduler_listener() -> std::io::Result<()> {
Ok(())
}

#[cfg(all(unix, feature = "syscall"))]
#[test]
fn scheduler_pthread_mutex_lock() -> std::io::Result<()> {
use std::sync::atomic::{AtomicUsize, Ordering};

static COUNTER: AtomicUsize = AtomicUsize::new(0);
static mut MUTEX: libc::pthread_mutex_t = libc::PTHREAD_MUTEX_INITIALIZER;

let mut scheduler = Scheduler::default();
for _ in 0..3 {
_ = scheduler.submit_co(
|_, _| {
let r = open_coroutine_core::syscall::pthread_mutex_lock(
None,
std::ptr::addr_of_mut!(MUTEX),
);
assert_eq!(0, r, "pthread_mutex_lock failed with {r}");
COUNTER.fetch_add(1, Ordering::SeqCst);
let r = open_coroutine_core::syscall::pthread_mutex_unlock(
None,
std::ptr::addr_of_mut!(MUTEX),
);
assert_eq!(0, r, "pthread_mutex_unlock failed with {r}");
None
},
None,
None,
)?;
}
scheduler.try_schedule()?;
assert_eq!(3, COUNTER.load(Ordering::SeqCst));
Ok(())
}

#[test]
fn scheduler_try_cancel_coroutine() -> std::io::Result<()> {
let mut scheduler = Scheduler::default();
Expand Down
23 changes: 19 additions & 4 deletions hook/src/syscall/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,24 @@ impl_hook!(RENAMEAT, renameat(olddirfd: c_int, oldpath: *const c_char, newdirfd:
#[cfg(target_os = "linux")]
impl_hook!(RENAMEAT2, renameat2(olddirfd: c_int, oldpath: *const c_char, newdirfd: c_int, newpath: *const c_char, flags: c_uint) -> c_int);

// pthread_mutex_lock/unlock: on Linux and other non-macOS Unix the once_cell::sync::Lazy
// initialization uses futex (not pthread_mutex_t), so impl_hook! is safe and the plain
// macro is used.
//
// On macOS, once_cell::sync::Lazy init calls dlsym which internally acquires a dyld lock
// implemented as a pthread_mutex_t. This would recurse back into the hook. The per-thread
// re-entrancy flag that breaks the cycle causes a separate cross-coroutine deadlock under
// preemptive scheduling: a coroutine that sets the flag and is then preempted leaves the
// flag set, so the next coroutine on the same thread skips the NIO path and blocks the
// event-loop thread in the real (blocking) pthread_mutex_lock. Because the NIO path for
// pthread_mutex_lock is just a trylock poll loop (no genuine async benefit) and the
// deadlock is architectural, the macOS hooks are omitted entirely. The core
// open_coroutine_core::syscall::pthread_mutex_{lock,unlock} functions remain available
// for direct use in tests and other explicit call sites.
#[cfg(not(target_os = "macos"))]
impl_hook!(PTHREAD_MUTEX_LOCK, pthread_mutex_lock(lock: *mut pthread_mutex_t) -> c_int);
#[cfg(not(target_os = "macos"))]
impl_hook!(PTHREAD_MUTEX_UNLOCK, pthread_mutex_unlock(lock: *mut pthread_mutex_t) -> c_int);

// NOTE: unhook poll due to mio's poller
// impl_hook!(POLL, poll(fds: *mut pollfd, nfds: nfds_t, timeout: c_int) -> c_int);

// NOTE: unhook pthread_mutex_lock/pthread_mutex_unlock due to stack overflow or bug
// impl_hook!(PTHREAD_MUTEX_LOCK, pthread_mutex_lock(lock: *mut pthread_mutex_t) -> c_int);
// impl_hook!(PTHREAD_MUTEX_UNLOCK, pthread_mutex_unlock(lock: *mut pthread_mutex_t) -> c_int);
2 changes: 0 additions & 2 deletions hook/src/syscall/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ unsafe fn attach() -> std::io::Result<()> {
impl_hook!("ws2_32.dll", WSASOCKETW, WSASocketW(domain: c_int, ty: WINSOCK_SOCKET_TYPE, protocol: IPPROTO, lpprotocolinfo: *const WSAPROTOCOL_INFOW, g: c_uint, dw_flags: c_uint) -> SOCKET);
impl_hook!("ws2_32.dll", SELECT, select(nfds: c_int, readfds: *mut FD_SET, writefds: *mut FD_SET, errorfds: *mut FD_SET, timeout: *mut TIMEVAL) -> c_int);
impl_hook!("ws2_32.dll", WSAPOLL, WSAPoll(fds: *mut WSAPOLLFD, nfds: c_uint, timeout: c_int) -> c_int);
// NOTE: unhook WaitOnAddress due to stack overflow or bug
// impl_hook!("api-ms-win-core-synch-l1-2-0.dll", WAITONADDRESS, WaitOnAddress(address: *const c_void, compareaddress: *const c_void, addresssize: usize, dwmilliseconds: c_uint) -> BOOL);

// Enable the hook
minhook::MinHook::enable_all_hooks().map_err(|_| Error::other("init all hooks failed !"))
Expand Down
Loading