From ce7c37d1c9c43a99816e204a2757f33ed76b69b8 Mon Sep 17 00:00:00 2001 From: Tim Blechmann Date: Fri, 1 May 2026 09:27:49 +0800 Subject: [PATCH 1/6] syscall: API improvements --- CMakeLists.txt | 1 + include/nova/sync/detail/syscall.hpp | 52 ++++++++++++++----- .../sync/event/native_auto_reset_event.cpp | 24 ++++----- .../sync/event/native_manual_reset_event.cpp | 24 +++++---- source/nova/sync/mutex/eventfd_mutex.cpp | 20 +++---- .../nova/sync/semaphore/eventfd_semaphore.cpp | 10 ++-- 6 files changed, 80 insertions(+), 51 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 74e69fc..d974c13 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -28,6 +28,7 @@ set(headers include/nova/sync/detail/async_support.hpp include/nova/sync/detail/backoff.hpp include/nova/sync/detail/compat.hpp + include/nova/sync/detail/native_handle_support.hpp include/nova/sync/detail/pause.hpp include/nova/sync/detail/syscall.hpp include/nova/sync/detail/timed_wait.hpp diff --git a/include/nova/sync/detail/syscall.hpp b/include/nova/sync/detail/syscall.hpp index 0572132..d3407fc 100644 --- a/include/nova/sync/detail/syscall.hpp +++ b/include/nova/sync/detail/syscall.hpp @@ -4,6 +4,7 @@ #pragma once #include +#include #if __has_include( ) && __has_include( ) && __has_include( ) # include @@ -15,20 +16,20 @@ namespace nova::sync::detail { // Retry a POSIX read() across EINTR. Returns the same return value as read(). -inline ssize_t read_intr( int fd, void* buf, size_t count ) noexcept +inline ssize_t read_intr( int fd, std::span< std::byte > buf ) noexcept { ssize_t rc; do { - rc = ::read( fd, buf, count ); + rc = ::read( fd, buf.data(), buf.size() ); } while ( rc < 0 && errno == EINTR ); return rc; } -inline ssize_t write_intr( int fd, const void* buf, size_t count ) noexcept +inline ssize_t write_intr( int fd, std::span< const std::byte > buf ) noexcept { - const unsigned char* p = static_cast< const unsigned char* >( buf ); - size_t remaining = count; - size_t offset = 0; + const std::byte* p = buf.data(); + size_t remaining = buf.size(); + size_t offset = 0; while ( remaining > 0 ) { ssize_t rc = ::write( fd, p + offset, remaining ); @@ -42,25 +43,45 @@ inline ssize_t write_intr( int fd, const void* buf, size_t count ) noexcept offset += rc; } - return ssize_t( count ); + return ssize_t( buf.size() ); } -inline int poll_intr( struct pollfd* fds, nfds_t nfds ) noexcept + +inline int poll_intr( std::span< struct pollfd > fds ) noexcept { int rc; do { - rc = ::poll( fds, nfds, -1 ); + rc = ::poll( fds.data(), nfds_t( fds.size() ), -1 ); } while ( rc < 0 && errno == EINTR ); return rc; } -inline int poll_intr( struct pollfd* fds, nfds_t nfds, std::chrono::milliseconds timeout ) noexcept +inline int poll_intr( struct pollfd& fd ) noexcept +{ + return poll_intr( std::span< struct pollfd >( &fd, 1 ) ); +} + + +inline int try_poll( std::span< struct pollfd > fds ) noexcept { - using clock = std::chrono::steady_clock; - auto start = clock::now(); + return ::poll( fds.data(), nfds_t( fds.size() ), 0 ); +} + +inline int try_poll( struct pollfd& fd ) noexcept +{ + return try_poll( std::span< struct pollfd >( &fd, 1 ) ); +} + +inline int poll_intr( std::span< struct pollfd > fds, std::chrono::milliseconds timeout ) noexcept +{ + if ( timeout <= std::chrono::milliseconds::zero() ) + return try_poll( fds ); + + using clock = std::chrono::steady_clock; + const auto start = clock::now(); while ( true ) { int rem = int( std::chrono::duration_cast< std::chrono::milliseconds >( timeout ).count() ); - int rc = ::poll( fds, nfds, rem ); + int rc = ::poll( fds.data(), nfds_t( fds.size() ), rem ); if ( rc < 0 ) { if ( errno == EINTR ) { auto elapsed = clock::now() - start; @@ -74,6 +95,11 @@ inline int poll_intr( struct pollfd* fds, nfds_t nfds, std::chrono::milliseconds } } +inline int poll_intr( struct pollfd& fd, std::chrono::milliseconds timeout ) noexcept +{ + return poll_intr( std::span< struct pollfd >( &fd, 1 ), timeout ); +} + } // namespace nova::sync::detail #endif diff --git a/source/nova/sync/event/native_auto_reset_event.cpp b/source/nova/sync/event/native_auto_reset_event.cpp index 70f66cc..e87199d 100644 --- a/source/nova/sync/event/native_auto_reset_event.cpp +++ b/source/nova/sync/event/native_auto_reset_event.cpp @@ -35,8 +35,8 @@ native_auto_reset_event::native_auto_reset_event( bool initially_set ) noexcept #elif defined( __linux__ ) handle_.reset( ::eventfd( 0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE ) ); if ( initially_set ) { - uint64_t val = 1; - detail::write_intr( handle_.get(), &val, sizeof( val ) ); + const std::array< uint64_t, 1 > val { 1 }; + detail::write_intr( handle_.get(), std::as_bytes( std::span( val ) ) ); } #elif defined( __APPLE__ ) handle_.reset( ::kqueue() ); @@ -88,12 +88,12 @@ void native_auto_reset_event::signal() noexcept // Only apply idempotency check if no threads are waiting if ( wait_count_.load( std::memory_order_acquire ) == 0 ) { struct pollfd pfd = { handle_.get(), POLLIN, 0 }; - if ( detail::poll_intr( &pfd, 1, 0ms ) > 0 ) { + if ( detail::try_poll( pfd ) > 0 ) { return; // Already set, don't write } } - uint64_t val = 1; - detail::write_intr( handle_.get(), &val, sizeof( val ) ); + std::array< uint64_t, 1 > val { 1 }; + detail::write_intr( handle_.get(), std::as_bytes( std::span( val ) ) ); #elif defined( __APPLE__ ) // Strategy: // • token_count_ tracks unconsumed tokens. @@ -133,7 +133,7 @@ void native_auto_reset_event::signal() noexcept // trigger once a token is consumed. #else uint8_t dummy = 1; - detail::write_intr( fds_[ 1 ].get(), &dummy, 1 ); + detail::write_intr( fds_[ 1 ].get(), std::as_bytes( std::span( &dummy, 1 ) ) ); #endif } @@ -142,8 +142,8 @@ bool native_auto_reset_event::try_wait() noexcept #if defined( _WIN32 ) return ::WaitForSingleObject( handle_.get(), 0 ) == WAIT_OBJECT_0; #elif defined( __linux__ ) - uint64_t val; - return detail::read_intr( handle_.get(), &val, sizeof( val ) ) == ssize_t( sizeof( val ) ); + std::array< uint64_t, 1 > val {}; + return detail::read_intr( handle_.get(), as_writable_bytes( std::span( val ) ) ) == sizeof( uint64_t ); #elif defined( __APPLE__ ) // Attempt to consume one token via CAS. int s = token_count_.load( std::memory_order_acquire ); @@ -155,7 +155,7 @@ bool native_auto_reset_event::try_wait() noexcept #else uint8_t buf[ 128 ]; bool signaled = false; - while ( detail::read_intr( fds_[ 0 ].get(), buf, sizeof( buf ) ) > 0 ) + while ( detail::read_intr( fds_[ 0 ].get(), std::as_writable_bytes( std::span( buf ) ) ) > 0 ) signaled = true; return signaled; #endif @@ -169,7 +169,7 @@ void native_auto_reset_event::wait() noexcept wait_count_.fetch_add( 1, std::memory_order_acquire ); while ( !try_wait() ) { struct pollfd pfd = { handle_.get(), POLLIN, 0 }; - detail::poll_intr( &pfd, 1 ); + detail::poll_intr( pfd ); } wait_count_.fetch_sub( 1, std::memory_order_release ); #elif defined( __APPLE__ ) @@ -206,7 +206,7 @@ void native_auto_reset_event::wait() noexcept #else while ( !try_wait() ) { struct pollfd pfd = { native_handle(), POLLIN, 0 }; - detail::poll_intr( &pfd, 1 ); + detail::poll_intr( pfd ); } #endif } @@ -255,7 +255,7 @@ bool native_auto_reset_event::try_wait_for( duration_type timeout ) noexcept return result; # else struct pollfd pfd = { native_handle(), POLLIN, 0 }; - detail::poll_intr( &pfd, 1, timeout ); + detail::poll_intr( pfd, timeout ); return try_wait(); # endif #endif diff --git a/source/nova/sync/event/native_manual_reset_event.cpp b/source/nova/sync/event/native_manual_reset_event.cpp index 624d550..1c89c28 100644 --- a/source/nova/sync/event/native_manual_reset_event.cpp +++ b/source/nova/sync/event/native_manual_reset_event.cpp @@ -8,6 +8,7 @@ #if defined( _WIN32 ) # include #else +# include # include # include # include @@ -67,9 +68,9 @@ void native_manual_reset_event::signal() noexcept ::SetEvent( handle_.get() ); #elif defined( __linux__ ) struct pollfd pfd = { handle_.get(), POLLIN, 0 }; - if ( detail::poll_intr( &pfd, 1, 0ms ) == 0 ) { - uint64_t val = 1; - detail::write_intr( handle_.get(), &val, sizeof( val ) ); + if ( detail::try_poll( pfd ) == 0 ) { + const std::array< uint64_t, 1 > val { 1 }; + detail::write_intr( handle_.get(), std::as_bytes( std::span( val ) ) ); } #elif defined( __APPLE__ ) struct kevent ev; @@ -77,9 +78,9 @@ void native_manual_reset_event::signal() noexcept ::kevent( handle_.get(), &ev, 1, nullptr, 0, nullptr ); #else struct pollfd pfd = { fds_[ 0 ].get(), POLLIN, 0 }; - if ( detail::poll_intr( &pfd, 1, 0ms ) == 0 ) { + if ( detail::try_poll( pfd ) == 0 ) { uint8_t dummy = 1; - detail::write_intr( fds_[ 1 ].get(), &dummy, 1 ); + detail::write_intr( fds_[ 1 ].get(), std::as_bytes( std::span( &dummy, 1 ) ) ); } #endif } @@ -89,8 +90,9 @@ void native_manual_reset_event::reset() noexcept #if defined( _WIN32 ) ::ResetEvent( handle_.get() ); #elif defined( __linux__ ) - uint64_t val; - while ( detail::read_intr( handle_.get(), &val, sizeof( val ) ) == ssize_t( sizeof( val ) ) ) {} + std::array< uint64_t, 1 > val {}; + while ( detail::read_intr( handle_.get(), std::as_writable_bytes( std::span( val ) ) ) + == ssize_t( sizeof( uint64_t ) ) ) {} #elif defined( __APPLE__ ) struct kevent ev; EV_SET( &ev, 1, EVFILT_USER, EV_DELETE, 0, 0, nullptr ); @@ -99,7 +101,7 @@ void native_manual_reset_event::reset() noexcept ::kevent( handle_.get(), &ev, 1, nullptr, 0, nullptr ); #else uint8_t buf[ 128 ]; - while ( detail::read_intr( fds_[ 0 ].get(), buf, sizeof( buf ) ) > 0 ) {} + while ( detail::read_intr( fds_[ 0 ].get(), std::as_writable_bytes( std::span( buf ) ) ) > 0 ) {} #endif } @@ -109,7 +111,7 @@ bool native_manual_reset_event::try_wait() const noexcept return ::WaitForSingleObject( handle_.get(), 0 ) == WAIT_OBJECT_0; #else struct pollfd pfd = { native_handle(), POLLIN, 0 }; - int rc = detail::poll_intr( &pfd, 1, 0ms ); + int rc = detail::try_poll( pfd ); return rc > 0; #endif } @@ -121,7 +123,7 @@ void native_manual_reset_event::wait() const noexcept #else while ( !try_wait() ) { struct pollfd pfd = { native_handle(), POLLIN, 0 }; - detail::poll_intr( &pfd, 1 ); + detail::poll_intr( pfd ); } #endif } @@ -144,7 +146,7 @@ bool native_manual_reset_event::try_wait_for( duration_type timeout ) const noex return false; # else struct pollfd pfd = { native_handle(), POLLIN, 0 }; - int rc = detail::poll_intr( &pfd, 1, timeout ); + int rc = detail::poll_intr( pfd, timeout ); if ( rc <= 0 ) return try_wait(); return try_wait(); diff --git a/source/nova/sync/mutex/eventfd_mutex.cpp b/source/nova/sync/mutex/eventfd_mutex.cpp index 71d1c67..e1c5832 100644 --- a/source/nova/sync/mutex/eventfd_mutex.cpp +++ b/source/nova/sync/mutex/eventfd_mutex.cpp @@ -44,20 +44,20 @@ void eventfd_mutex::lock() noexcept POLLIN, 0, }; - detail::poll_intr( &pfd, 1 ); + detail::poll_intr( pfd ); } } bool eventfd_mutex::try_lock() noexcept { - uint64_t val = 0; - return detail::read_intr( evfd_, &val, sizeof( val ) ) == sizeof( val ); + std::array< uint64_t, 1 > val {}; + return detail::read_intr( evfd_, std::as_writable_bytes( std::span( val ) ) ) == ssize_t( sizeof( uint64_t ) ); } void eventfd_mutex::unlock() noexcept { - const uint64_t one = 1; - detail::write_intr( evfd_, &one, sizeof( one ) ); + const std::array< uint64_t, 1 > one { 1 }; + detail::write_intr( evfd_, std::as_bytes( std::span( one ) ) ); } // --------------------------------------------------------------------------- @@ -84,15 +84,15 @@ void fast_eventfd_mutex::unlock() noexcept uint32_t prev = state_.fetch_and( ~1u, std::memory_order_release ); if ( prev > 1 ) { - const uint64_t one = 1; - detail::write_intr( evfd_, &one, sizeof( one ) ); + const std::array< uint64_t, 1 > one { 1 }; + detail::write_intr( evfd_, std::as_bytes( std::span( one ) ) ); } } void fast_eventfd_mutex::consume_lock() const noexcept { - uint64_t val = 0; - detail::read_intr( evfd_, &val, sizeof( val ) ); + std::array< uint64_t, 1 > val {}; + detail::read_intr( evfd_, std::as_writable_bytes( std::span( val ) ) ); } // Slow path for lock acquisition, entered after try_lock() fails. @@ -154,7 +154,7 @@ void fast_eventfd_mutex::lock_slow() noexcept POLLIN, 0, }; - detail::poll_intr( &pfd, 1 ); + detail::poll_intr( pfd ); consume_lock(); s = state_.load( std::memory_order_acquire ); diff --git a/source/nova/sync/semaphore/eventfd_semaphore.cpp b/source/nova/sync/semaphore/eventfd_semaphore.cpp index c7047f0..07230c1 100644 --- a/source/nova/sync/semaphore/eventfd_semaphore.cpp +++ b/source/nova/sync/semaphore/eventfd_semaphore.cpp @@ -30,8 +30,8 @@ eventfd_semaphore::~eventfd_semaphore() void eventfd_semaphore::release( std::ptrdiff_t n ) noexcept { assert( n >= 0 && "eventfd_semaphore::release: n must be non-negative" ); - const uint64_t val = uint64_t( n ); - detail::write_intr( evfd_, &val, sizeof( val ) ); + const std::array< uint64_t, 1 > val { uint64_t( n ) }; + detail::write_intr( evfd_, std::as_bytes( std::span( val ) ) ); } void eventfd_semaphore::acquire() noexcept @@ -42,14 +42,14 @@ void eventfd_semaphore::acquire() noexcept POLLIN, 0, }; - detail::poll_intr( &pfd, 1 ); + detail::poll_intr( pfd ); } } bool eventfd_semaphore::try_acquire() noexcept { - uint64_t val = 0; - return detail::read_intr( evfd_, &val, sizeof( val ) ) == sizeof( val ); + std::array< uint64_t, 1 > val {}; + return detail::read_intr( evfd_, std::as_writable_bytes( std::span( val ) ) ) == ssize_t( sizeof( uint64_t ) ); } } // namespace nova::sync From 9cfcf72e2907d5c5e66ce92316ebfb6a5080fa27 Mon Sep 17 00:00:00 2001 From: Tim Blechmann Date: Fri, 1 May 2026 08:32:41 +0800 Subject: [PATCH 2/6] cmake: cleanup --- CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index d974c13..786bf52 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -217,8 +217,8 @@ if(NOVA_SYNC_TESTS) ) endif() - if ("CMAKE_CXX_COMPILER_ID MATCHES Clang OR CMAKE_CXX_COMPILER_ID MATCHES AppleClang") - add_compile_options(nova_sync PRIVATE -Wthread-safety) + if(CMAKE_CXX_COMPILER_ID MATCHES Clang OR CMAKE_CXX_COMPILER_ID MATCHES AppleClang) + target_compile_options(nova_sync PRIVATE -Wthread-safety) endif() add_executable(nova_sync_tests From df3b5d79d820f032400be6546086ec2b638aba4c Mon Sep 17 00:00:00 2001 From: Tim Blechmann Date: Fri, 1 May 2026 08:37:04 +0800 Subject: [PATCH 3/6] async_support: add missing include guard --- include/nova/sync/semaphore/detail/async_support.hpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/include/nova/sync/semaphore/detail/async_support.hpp b/include/nova/sync/semaphore/detail/async_support.hpp index 0a6bfd7..3e71b4e 100644 --- a/include/nova/sync/semaphore/detail/async_support.hpp +++ b/include/nova/sync/semaphore/detail/async_support.hpp @@ -1,3 +1,5 @@ +#pragma once + /// Semaphore async handlers invoke with `expected`. /// Semaphore acquire completes synchronously; no lock guard returned (cf. events). /// From db1c150f040a0b02d8c4518a3b1cc460f49ba1a4 Mon Sep 17 00:00:00 2001 From: Tim Blechmann Date: Fri, 1 May 2026 08:50:33 +0800 Subject: [PATCH 4/6] atomic wait: fix memory order --- source/nova/sync/futex/atomic_wait.cpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/source/nova/sync/futex/atomic_wait.cpp b/source/nova/sync/futex/atomic_wait.cpp index 728052a..820399a 100644 --- a/source/nova/sync/futex/atomic_wait.cpp +++ b/source/nova/sync/futex/atomic_wait.cpp @@ -363,6 +363,8 @@ bool atomic_wait_for( std::atomic< int32_t >& atom, return atom.load( std::memory_order_relaxed ) != old; } ); + if ( order != std::memory_order_relaxed ) + std::atomic_thread_fence( std::memory_order_acquire ); return atom.load( std::memory_order_relaxed ) != old; } @@ -387,6 +389,8 @@ bool atomic_wait_until( std::atomic< int32_t >& return atom.load( std::memory_order_relaxed ) != old; } ); + if ( order != std::memory_order_relaxed ) + std::atomic_thread_fence( std::memory_order_acquire ); return atom.load( std::memory_order_relaxed ) != old; } @@ -411,6 +415,8 @@ bool atomic_wait_until( std::atomic< int32_t >& return atom.load( std::memory_order_relaxed ) != old; } ); + if ( order != std::memory_order_relaxed ) + std::atomic_thread_fence( std::memory_order_acquire ); return atom.load( std::memory_order_relaxed ) != old; } From 6f18fe2e2d2e8a7d6ee05af6131d1b50c1823c0f Mon Sep 17 00:00:00 2001 From: Tim Blechmann Date: Fri, 1 May 2026 08:35:18 +0800 Subject: [PATCH 5/6] libdispatch: refine memory barriers --- include/nova/sync/event/support/libdispatch_support.hpp | 4 ++-- include/nova/sync/mutex/support/libdispatch_support.hpp | 4 ++-- include/nova/sync/semaphore/support/libdispatch_support.hpp | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/include/nova/sync/event/support/libdispatch_support.hpp b/include/nova/sync/event/support/libdispatch_support.hpp index 44a5e2b..d62e3c7 100644 --- a/include/nova/sync/event/support/libdispatch_support.hpp +++ b/include/nova/sync/event/support/libdispatch_support.hpp @@ -77,14 +77,14 @@ struct dispatch_event_cancel_state void cancel() noexcept { - cancelled.store( true, std::memory_order_release ); + cancelled.store( true, std::memory_order_seq_cst ); if ( cancellation_callback_ ) cancellation_callback_(); } bool is_cancelled() const noexcept { - return cancelled.load( std::memory_order_acquire ); + return cancelled.load( std::memory_order_seq_cst ); } bool try_mark_invoked() noexcept diff --git a/include/nova/sync/mutex/support/libdispatch_support.hpp b/include/nova/sync/mutex/support/libdispatch_support.hpp index 7ca5de5..925844f 100644 --- a/include/nova/sync/mutex/support/libdispatch_support.hpp +++ b/include/nova/sync/mutex/support/libdispatch_support.hpp @@ -91,13 +91,13 @@ struct dispatch_cancel_state void cancel() noexcept { - cancelled.store( true, std::memory_order_release ); + cancelled.store( true, std::memory_order_seq_cst ); if ( cancellation_callback_ ) cancellation_callback_(); } bool is_cancelled() const noexcept { - return cancelled.load( std::memory_order_acquire ); + return cancelled.load( std::memory_order_seq_cst ); } bool try_mark_invoked() noexcept { diff --git a/include/nova/sync/semaphore/support/libdispatch_support.hpp b/include/nova/sync/semaphore/support/libdispatch_support.hpp index 7516343..70e890d 100644 --- a/include/nova/sync/semaphore/support/libdispatch_support.hpp +++ b/include/nova/sync/semaphore/support/libdispatch_support.hpp @@ -73,14 +73,14 @@ struct dispatch_semaphore_cancel_state void cancel() noexcept { - cancelled.store( true, std::memory_order_release ); + cancelled.store( true, std::memory_order_seq_cst ); if ( cancellation_callback_ ) cancellation_callback_(); } bool is_cancelled() const noexcept { - return cancelled.load( std::memory_order_acquire ); + return cancelled.load( std::memory_order_seq_cst ); } bool try_mark_invoked() noexcept From 78889b804f0da57810175c419ec0a409e97d4a1d Mon Sep 17 00:00:00 2001 From: Tim Blechmann Date: Fri, 1 May 2026 10:32:41 +0800 Subject: [PATCH 6/6] futex: fix wakeup --- source/nova/sync/futex/atomic_wait.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/nova/sync/futex/atomic_wait.cpp b/source/nova/sync/futex/atomic_wait.cpp index 820399a..bf2b06b 100644 --- a/source/nova/sync/futex/atomic_wait.cpp +++ b/source/nova/sync/futex/atomic_wait.cpp @@ -423,14 +423,16 @@ bool atomic_wait_until( std::atomic< int32_t >& void atomic_notify_one( std::atomic< int32_t >& atom ) noexcept { atom.notify_one(); - auto& b = bucket_for( &atom ); + auto& b = bucket_for( &atom ); + std::lock_guard lock( b.mutex ); b.cv.notify_one(); } void atomic_notify_all( std::atomic< int32_t >& atom ) noexcept { atom.notify_all(); - auto& b = bucket_for( &atom ); + auto& b = bucket_for( &atom ); + std::lock_guard lock( b.mutex ); b.cv.notify_all(); }