diff --git a/CMakeLists.txt b/CMakeLists.txt index e8f9010..74e69fc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -31,7 +31,7 @@ set(headers include/nova/sync/detail/pause.hpp include/nova/sync/detail/syscall.hpp include/nova/sync/detail/timed_wait.hpp - + include/nova/sync/futex/atomic_wait.hpp include/nova/sync/event/auto_reset_event.hpp include/nova/sync/event/concepts.hpp include/nova/sync/event/detail/async_support.hpp @@ -86,6 +86,7 @@ set(headers set(sources source/nova/sync/detail/syscall.cpp source/nova/sync/detail/timed_wait.cpp + source/nova/sync/futex/atomic_wait.cpp source/nova/sync/event/native_auto_reset_event.cpp source/nova/sync/event/native_manual_reset_event.cpp source/nova/sync/event/timed_auto_reset_event.cpp @@ -132,6 +133,11 @@ if(WIN32) WIN32_LEAN_AND_MEAN _WIN32_WINNT=0x0a00 ) + + target_link_libraries(nova_sync PRIVATE + Synchronization.lib + Kernel32.lib + ) endif() # Testing @@ -239,6 +245,7 @@ if(NOVA_SYNC_TESTS) test/event_test.cpp test/event_benchmarks.cpp test/event_async_asio_test.cpp + test/futex_test.cpp test/semaphore_types.hpp test/semaphore_test.cpp test/semaphore_benchmarks.cpp @@ -294,7 +301,7 @@ if(NOVA_SYNC_TESTS) if (NOT NOVA_SYNC_TESTS_STRESS_TEST) catch_discover_tests(nova_sync_tests - TEST_FILTER ~[stress] + TEST_SPEC "~[stress]" PROPERTIES TIMEOUT 20 ) diff --git a/include/nova/sync/event/auto_reset_event.hpp b/include/nova/sync/event/auto_reset_event.hpp index 4e41f52..e16e104 100644 --- a/include/nova/sync/event/auto_reset_event.hpp +++ b/include/nova/sync/event/auto_reset_event.hpp @@ -7,6 +7,7 @@ #include #include +#include namespace nova::sync { @@ -42,7 +43,7 @@ class auto_reset_event if ( state_.compare_exchange_weak( s, s + 1, std::memory_order_release, std::memory_order_relaxed ) ) { if ( s < 0 ) - state_.notify_all(); + atomic_notify_all( state_ ); return; } } @@ -76,11 +77,45 @@ class auto_reset_event int32_t cur = state_.load( std::memory_order_relaxed ); while ( cur <= my_slot ) { - state_.wait( cur, std::memory_order_relaxed ); + atomic_wait( state_, cur, std::memory_order_acquire ); cur = state_.load( std::memory_order_relaxed ); } + } + + template < class Clock, class Duration > + [[nodiscard]] bool try_wait_until( std::chrono::time_point< Clock, Duration > const& abs_time ) noexcept + { + if ( try_wait() ) + return true; + + int32_t prev = state_.fetch_sub( 1, std::memory_order_acquire ); + if ( prev > 0 ) + return true; + + int32_t my_slot = prev - 1; + int32_t cur = state_.load( std::memory_order_relaxed ); - std::atomic_thread_fence( std::memory_order_acquire ); + while ( cur <= my_slot ) { + if ( !atomic_wait_until( state_, cur, abs_time, std::memory_order_acquire ) ) { + // Timeout — but check one more time + cur = state_.load( std::memory_order_relaxed ); + if ( cur > my_slot ) { + return true; + } + // Must undo our wait registration: add 1 back + state_.fetch_add( 1, std::memory_order_relaxed ); + return false; + } + cur = state_.load( std::memory_order_relaxed ); + } + + return true; + } + + template < class Rep, class Period > + [[nodiscard]] bool try_wait_for( std::chrono::duration< Rep, Period > const& rel_time ) noexcept + { + return try_wait_until( std::chrono::steady_clock::now() + rel_time ); } private: diff --git a/include/nova/sync/event/manual_reset_event.hpp b/include/nova/sync/event/manual_reset_event.hpp index 9ae0ef9..6106cb0 100644 --- a/include/nova/sync/event/manual_reset_event.hpp +++ b/include/nova/sync/event/manual_reset_event.hpp @@ -4,8 +4,10 @@ #pragma once #include +#include #include +#include namespace nova::sync { @@ -30,13 +32,13 @@ class manual_reset_event void signal() noexcept { if ( state_.exchange( 1u, std::memory_order_release ) == 0u ) - state_.notify_all(); + atomic_notify_all( state_ ); } /// @brief Transitions the event back to "not set". void reset() noexcept { - state_.store( 0u, std::memory_order_release ); + state_.store( 0u, std::memory_order_relaxed ); } /// @brief Returns true if the event is currently set, without blocking. @@ -55,9 +57,28 @@ class manual_reset_event // Park: block until the value is no longer 0. // Spurious wakeups are handled by the loop. while ( state_.load( std::memory_order_relaxed ) == 0u ) - state_.wait( 0u, std::memory_order_relaxed ); + atomic_wait( state_, 0u, std::memory_order_acquire ); + } + + template < class Clock, class Duration > + [[nodiscard]] bool try_wait_until( std::chrono::time_point< Clock, Duration > const& abs_time ) noexcept + { + if ( state_.load( std::memory_order_acquire ) != 0 ) + return true; + + while ( state_.load( std::memory_order_relaxed ) == 0 ) { + if ( !atomic_wait_until( state_, 0u, abs_time, std::memory_order_acquire ) ) { + return state_.load( std::memory_order_acquire ) != 0; + } + } - std::atomic_thread_fence( std::memory_order_acquire ); + return true; + } + + template < class Rep, class Period > + [[nodiscard]] bool try_wait_for( std::chrono::duration< Rep, Period > const& rel_time ) noexcept + { + return try_wait_until( std::chrono::steady_clock::now() + rel_time ); } private: diff --git a/include/nova/sync/event/timed_auto_reset_event.hpp b/include/nova/sync/event/timed_auto_reset_event.hpp index 0e2b9be..9540214 100644 --- a/include/nova/sync/event/timed_auto_reset_event.hpp +++ b/include/nova/sync/event/timed_auto_reset_event.hpp @@ -11,7 +11,12 @@ #include +#if defined( __linux__ ) || defined( _WIN32 ) +# include "nova/sync/event/auto_reset_event.hpp" +#endif + namespace nova::sync { +namespace impl { /// @brief Auto-reset event with timed-wait support. /// @@ -125,4 +130,17 @@ class timed_auto_reset_event std::counting_semaphore< max_waiters > sem_ { 0 }; }; +} // namespace impl + +#if defined( __linux__ ) || defined( _WIN32 ) + +using timed_auto_reset_event = auto_reset_event; + +#else + +using timed_auto_reset_event = impl::timed_auto_reset_event; + +#endif + + } // namespace nova::sync diff --git a/include/nova/sync/event/timed_manual_reset_event.hpp b/include/nova/sync/event/timed_manual_reset_event.hpp index 23205ae..dc64b3b 100644 --- a/include/nova/sync/event/timed_manual_reset_event.hpp +++ b/include/nova/sync/event/timed_manual_reset_event.hpp @@ -11,6 +11,12 @@ #include +#if defined( __linux__ ) || defined( _WIN32 ) +# include "nova/sync/event/manual_reset_event.hpp" +#else + +#endif + namespace nova::sync { /// @brief Manual-reset event with timed-wait support. @@ -18,6 +24,8 @@ namespace nova::sync { /// Once `signal()` is called, all waiters are woken and subsequent `wait()` / /// `try_wait()` calls return immediately until `reset()` is called. +namespace impl { + class timed_manual_reset_event { // ----------------------------------------------------------------------- @@ -140,4 +148,17 @@ class timed_manual_reset_event std::counting_semaphore< max_waiters > sem_ { 0 }; }; +} // namespace impl + +#if defined( __linux__ ) || defined( _WIN32 ) + +using timed_manual_reset_event = manual_reset_event; + +#else + +using timed_manual_reset_event = impl::timed_manual_reset_event; + +#endif + + } // namespace nova::sync diff --git a/include/nova/sync/futex/atomic_wait.hpp b/include/nova/sync/futex/atomic_wait.hpp new file mode 100644 index 0000000..f55395f --- /dev/null +++ b/include/nova/sync/futex/atomic_wait.hpp @@ -0,0 +1,120 @@ +// SPDX-License-Identifier: MIT +// SPDX-FileCopyrightText: 2026 Tim Blechmann + +#pragma once + +#include +#include +#include +#include + +namespace nova::sync { + +// ============================================================================= +// int32_t overloads — primary implementation +// ============================================================================= + +/// @brief Blocks until the value of @p atom differs from @p old. +/// +/// Spurious wakeups are possible; callers must re-check in a loop. +/// +/// @param atom The atomic to monitor. +/// @param old Expected value; if already different, returns immediately. +/// @param order Memory ordering (`acquire`, `relaxed`, or `seq_cst`). +void atomic_wait( std::atomic< int32_t >& atom, + int32_t old, + std::memory_order order = std::memory_order_seq_cst ) noexcept; + +/// @brief Blocks until the value differs from @p old or @p rel expires. +/// +/// Spurious wakeups are possible; callers must re-check in a loop. +/// +/// @param atom The atomic to monitor. +/// @param old Expected value; if already different, returns immediately. +/// @param rel Relative timeout duration. +/// @param order Memory ordering (`acquire`, `relaxed`, or `seq_cst`). +/// @return `true` if woken (value changed or spurious), `false` on timeout. +bool atomic_wait_for( std::atomic< int32_t >& atom, + int32_t old, + std::chrono::nanoseconds rel, + std::memory_order order = std::memory_order_seq_cst ) noexcept; + +/// @brief Blocks until the value differs from @p old or @p deadline is reached. +/// @param atom The atomic to monitor. +/// @param old Expected value. +/// @param deadline Absolute time point. +/// @param order Memory ordering. +/// @return `true` if woken, `false` on timeout. +bool atomic_wait_until( std::atomic< int32_t >& atom, + int32_t old, + const std::chrono::time_point< std::chrono::steady_clock >& deadline, + std::memory_order order = std::memory_order_seq_cst ) noexcept; + +/// @brief Blocks until the value differs from @p old or @p deadline is reached. +/// @param atom The atomic to monitor. +/// @param old Expected value. +/// @param deadline Absolute time point (system clock / wall time). +/// @param order Memory ordering. +/// @return `true` if woken, `false` on timeout. +bool atomic_wait_until( std::atomic< int32_t >& atom, + int32_t old, + const std::chrono::time_point< std::chrono::system_clock >& deadline, + std::memory_order order = std::memory_order_seq_cst ) noexcept; + +/// @brief Wakes one thread waiting on this atomic. +/// @param order Memory ordering (`release` or `seq_cst`). +void atomic_notify_one( std::atomic< int32_t >& atom ) noexcept; + +/// @brief Wakes all threads waiting on this atomic. +/// @param order Memory ordering (`release` or `seq_cst`). +void atomic_notify_all( std::atomic< int32_t >& atom ) noexcept; + +// ============================================================================= +// uint32_t overloads — thin wrappers over the int32_t core +// +// Futex operates on 32-bit words regardless of signedness. These overloads +// reinterpret the uint32_t atomic as int32_t so callers with unsigned state +// (e.g. fast_mutex, fair_mutex, manual_reset_event) can use the same API +// without manual casts. +// ============================================================================= + +inline void +atomic_wait( std::atomic< uint32_t >& atom, uint32_t old, std::memory_order order = std::memory_order_seq_cst ) noexcept +{ + atomic_wait( reinterpret_cast< std::atomic< int32_t >& >( atom ), std::bit_cast< int32_t >( old ), order ); +} + +template < class Clock, class Duration > +inline bool atomic_wait_until( std::atomic< uint32_t >& atom, + uint32_t old, + const std::chrono::time_point< Clock, Duration >& deadline, + std::memory_order order = std::memory_order_seq_cst ) noexcept +{ + return atomic_wait_until( reinterpret_cast< std::atomic< int32_t >& >( atom ), + std::bit_cast< int32_t >( old ), + deadline, + order ); +} + +inline bool atomic_wait_for( std::atomic< uint32_t >& atom, + uint32_t old, + std::chrono::nanoseconds rel, + std::memory_order order = std::memory_order_seq_cst ) noexcept +{ + return atomic_wait_for( reinterpret_cast< std::atomic< int32_t >& >( atom ), + std::bit_cast< int32_t >( old ), + rel, + order ); +} + +inline void atomic_notify_one( std::atomic< uint32_t >& atom ) noexcept +{ + atomic_notify_one( reinterpret_cast< std::atomic< int32_t >& >( atom ) ); +} + +inline void atomic_notify_all( std::atomic< uint32_t >& atom ) noexcept +{ + atomic_notify_all( reinterpret_cast< std::atomic< int32_t >& >( atom ) ); +} + +} // namespace nova::sync diff --git a/include/nova/sync/mutex/eventfd_mutex.hpp b/include/nova/sync/mutex/eventfd_mutex.hpp index 0027484..80fb183 100644 --- a/include/nova/sync/mutex/eventfd_mutex.hpp +++ b/include/nova/sync/mutex/eventfd_mutex.hpp @@ -47,7 +47,7 @@ class NOVA_SYNC_CAPABILITY( "mutex" ) eventfd_mutex /// @return `true` if the lock was acquired, `false` if the duration expired. bool try_lock_for( duration_type rel_ns ) noexcept NOVA_SYNC_TRY_ACQUIRE( true ) { - if ( rel_ns.count() <= 0 ) + if ( rel_ns <= std::chrono::nanoseconds::zero() ) return try_lock(); while ( !try_lock() ) { if ( !detail::ppoll_for( evfd_, rel_ns ) ) diff --git a/include/nova/sync/mutex/fair_mutex.hpp b/include/nova/sync/mutex/fair_mutex.hpp index 3f1ea2a..b3a625c 100644 --- a/include/nova/sync/mutex/fair_mutex.hpp +++ b/include/nova/sync/mutex/fair_mutex.hpp @@ -4,13 +4,23 @@ #pragma once #include +#include #include +#include #include namespace nova::sync { /// @brief Fair mutex with FIFO lock acquisition order (ticket lock). +/// +/// @note `try_lock_for` / `try_lock_until` do **not** acquire a ticket and +/// therefore do not participate in the FIFO order. They retry +/// `try_lock()` (opportunistically grabbing the next ticket only when +/// the lock is free) until the deadline, sleeping between attempts via +/// the futex-based `atomic_wait_until`. This avoids ticket starvation +/// but means timed waiters are not strictly fair against blocked +/// `lock()` callers. class NOVA_SYNC_CAPABILITY( "mutex" ) fair_mutex { public: @@ -23,9 +33,9 @@ class NOVA_SYNC_CAPABILITY( "mutex" ) fair_mutex /// @brief Acquires the lock in FIFO order. inline void lock() noexcept NOVA_SYNC_ACQUIRE() { - uint32_t my_ticket = next_ticket_.fetch_add( 1, std::memory_order_seq_cst ); + uint32_t my_ticket = next_ticket_.fetch_add( 1, std::memory_order_relaxed ); - if ( serving_ticket_.load( std::memory_order_seq_cst ) == my_ticket ) + if ( serving_ticket_.load( std::memory_order_acquire ) == my_ticket ) return; lock_slow( my_ticket ); @@ -49,10 +59,45 @@ class NOVA_SYNC_CAPABILITY( "mutex" ) fair_mutex { uint32_t next_serving = serving_ticket_.load( std::memory_order_relaxed ) + 1; - serving_ticket_.store( next_serving, std::memory_order_seq_cst ); + serving_ticket_.store( next_serving, std::memory_order_release ); - if ( next_ticket_.load( std::memory_order_seq_cst ) != next_serving ) - serving_ticket_.notify_all(); + // Always notify: both ticket-queue waiters (lock_slow) and timed + // waiters (try_lock_until) watch serving_ticket_. + atomic_notify_all( serving_ticket_ ); + } + + /// @brief Tries to acquire the lock within a relative timeout. + /// + /// Does not take a ticket; see class documentation for fairness notes. + /// @return `true` if lock acquired, `false` if timed out. + template < class Rep, class Period > + [[nodiscard]] bool try_lock_for( std::chrono::duration< Rep, Period > const& rel_time ) noexcept + NOVA_SYNC_TRY_ACQUIRE( true ) + { + return try_lock_until( std::chrono::steady_clock::now() + rel_time ); + } + + /// @brief Tries to acquire the lock until an absolute deadline. + /// + /// Does not take a ticket; see class documentation for fairness notes. + /// @return `true` if lock acquired, `false` if timed out. + template < class Clock, class Duration > + [[nodiscard]] bool try_lock_until( std::chrono::time_point< Clock, Duration > const& abs_time ) noexcept + NOVA_SYNC_TRY_ACQUIRE( true ) + { + while ( true ) { + if ( try_lock() ) + return true; + + uint32_t serving = serving_ticket_.load( std::memory_order_relaxed ); + + // Return false if already past deadline + if ( Clock::now() >= abs_time ) + return false; + + // Sleep until serving_ticket_ changes (= someone unlocked) or deadline + atomic_wait_until( serving_ticket_, serving, abs_time ); + } } private: diff --git a/include/nova/sync/mutex/fast_mutex.hpp b/include/nova/sync/mutex/fast_mutex.hpp index 66592b4..5c05864 100644 --- a/include/nova/sync/mutex/fast_mutex.hpp +++ b/include/nova/sync/mutex/fast_mutex.hpp @@ -4,7 +4,9 @@ #pragma once #include +#include +#include #include namespace nova::sync { @@ -40,10 +42,32 @@ class NOVA_SYNC_CAPABILITY( "mutex" ) fast_mutex /// @brief Releases the lock and wakes one waiting thread if any. inline void unlock() noexcept NOVA_SYNC_RELEASE() { - uint32_t prev = state_.fetch_and( ~1, std::memory_order_release ); + uint32_t prev = state_.fetch_and( ~1u, std::memory_order_release ); if ( prev > 1 ) - state_.notify_one(); + atomic_notify_one( state_ ); + } + + /// @brief Tries to acquire the lock within a relative timeout. + /// @return `true` if lock acquired, `false` if timed out. + template < class Rep, class Period > + [[nodiscard]] bool try_lock_for( std::chrono::duration< Rep, Period > const& rel_time ) noexcept + NOVA_SYNC_TRY_ACQUIRE( true ) + { + return try_lock_until( std::chrono::steady_clock::now() + rel_time ); + } + + /// @brief Tries to acquire the lock until an absolute deadline. + /// @return `true` if lock acquired, `false` if timed out. + template < class Clock, class Duration > + [[nodiscard]] bool try_lock_until( std::chrono::time_point< Clock, Duration > const& abs_time ) noexcept + NOVA_SYNC_TRY_ACQUIRE( true ) + { + // Fast path + if ( try_lock() ) + return true; + + return lock_slow_until( abs_time ); } private: @@ -53,6 +77,45 @@ class NOVA_SYNC_CAPABILITY( "mutex" ) fast_mutex std::atomic< uint32_t > state_ { 0 }; void lock_slow( uint32_t expected ) noexcept; + + template < class Clock, class Duration > + bool lock_slow_until( std::chrono::time_point< Clock, Duration > const& abs_time ) noexcept + { + // Register as waiter + state_.fetch_add( 2, std::memory_order_relaxed ); + uint32_t expected = state_.load( std::memory_order_relaxed ); + + while ( true ) { + if ( ( expected & 1 ) == 0 ) { + uint32_t desired = ( expected - 2 ) | 1; + if ( state_.compare_exchange_weak( + expected, desired, std::memory_order_acquire, std::memory_order_relaxed ) ) + return true; + continue; // CAS failed, retry + } + + if ( !atomic_wait_until( state_, expected, abs_time ) ) { + // Timed out — undo waiter registration + expected = state_.load( std::memory_order_relaxed ); + while ( true ) { + // Try to grab lock while unregistering + if ( ( expected & 1 ) == 0 ) { + uint32_t desired = ( expected - 2 ) | 1; + if ( state_.compare_exchange_weak( + expected, desired, std::memory_order_acquire, std::memory_order_relaxed ) ) + return true; // grabbed it at last moment + continue; + } + // Lock still held: just decrement waiter count + if ( state_.compare_exchange_weak( + expected, expected - 2, std::memory_order_relaxed, std::memory_order_relaxed ) ) + return false; + } + } + + expected = state_.load( std::memory_order_relaxed ); + } + } }; } // namespace nova::sync diff --git a/include/nova/sync/mutex/kqueue_mutex.hpp b/include/nova/sync/mutex/kqueue_mutex.hpp index fa9e42b..a71658b 100644 --- a/include/nova/sync/mutex/kqueue_mutex.hpp +++ b/include/nova/sync/mutex/kqueue_mutex.hpp @@ -47,7 +47,7 @@ class NOVA_SYNC_CAPABILITY( "mutex" ) kqueue_mutex /// @return `true` if the lock was acquired, `false` if the duration expired. bool try_lock_for( duration_type rel_ns ) noexcept NOVA_SYNC_TRY_ACQUIRE( true ) { - if ( rel_ns.count() <= 0 ) + if ( rel_ns <= std::chrono::nanoseconds::zero() ) return try_lock(); while ( !try_lock() ) { if ( !detail::kevent_for( kqfd_, rel_ns ) ) diff --git a/include/nova/sync/semaphore/dispatch_semaphore.hpp b/include/nova/sync/semaphore/dispatch_semaphore.hpp index c8c711c..09099c9 100644 --- a/include/nova/sync/semaphore/dispatch_semaphore.hpp +++ b/include/nova/sync/semaphore/dispatch_semaphore.hpp @@ -40,7 +40,7 @@ class dispatch_semaphore [[nodiscard]] bool try_acquire_for( std::chrono::duration< Rep, Period > const& rel_time ) noexcept { auto ns = std::chrono::duration_cast< std::chrono::nanoseconds >( rel_time ); - if ( ns.count() <= 0 ) + if ( ns <= std::chrono::nanoseconds::zero() ) return try_acquire(); dispatch_time_t timeout = dispatch_time( DISPATCH_TIME_NOW, int64_t( ns.count() ) ); return dispatch_semaphore_wait( sem_, timeout ) == 0; diff --git a/include/nova/sync/semaphore/eventfd_semaphore.hpp b/include/nova/sync/semaphore/eventfd_semaphore.hpp index b33166d..4754a55 100644 --- a/include/nova/sync/semaphore/eventfd_semaphore.hpp +++ b/include/nova/sync/semaphore/eventfd_semaphore.hpp @@ -42,7 +42,7 @@ class eventfd_semaphore /// Blocks for up to @p rel_ns, attempting to acquire a token. Returns `true` if acquired, `false` if timed out. bool try_acquire_for( duration_type rel_ns ) noexcept { - if ( rel_ns.count() <= 0 ) + if ( rel_ns <= std::chrono::nanoseconds::zero() ) return try_acquire(); while ( !try_acquire() ) { if ( !detail::ppoll_for( evfd_, rel_ns ) ) diff --git a/include/nova/sync/semaphore/fast_semaphore.hpp b/include/nova/sync/semaphore/fast_semaphore.hpp index 17c07b9..351ca5f 100644 --- a/include/nova/sync/semaphore/fast_semaphore.hpp +++ b/include/nova/sync/semaphore/fast_semaphore.hpp @@ -6,10 +6,12 @@ #include #include #include +#include #include #include #include +#include namespace nova::sync { @@ -34,8 +36,10 @@ class fast_semaphore auto prev = count_.fetch_add( int32_t( n ), std::memory_order_release ); if ( prev < 0 ) { auto to_wake = std::min( int32_t( n ), -prev ); - for ( int32_t i = 0; i < to_wake; ++i ) + if ( to_wake == 1 ) count_.notify_one(); + else + count_.notify_all(); } } @@ -50,7 +54,7 @@ class fast_semaphore auto c = count_.load( std::memory_order_relaxed ); if ( c >= 0 ) return; - count_.wait( c, std::memory_order_relaxed ); + count_.wait( c, std::memory_order_acquire ); } } @@ -59,7 +63,7 @@ class fast_semaphore { auto c = count_.load( std::memory_order_relaxed ); while ( c > 0 ) { - if ( count_.compare_exchange_weak( c, c - 1, std::memory_order_acquire, std::memory_order_relaxed ) ) + if ( count_.compare_exchange_strong( c, c - 1, std::memory_order_acquire, std::memory_order_relaxed ) ) return true; } return false; @@ -69,4 +73,101 @@ class fast_semaphore std::atomic< int32_t > count_; }; + +/// Lock-free counting semaphore using atomic wait/notify. Supports timed waits +class fast_timed_semaphore +{ +public: + explicit fast_timed_semaphore( std::ptrdiff_t initial = 0 ) noexcept : + count_( int32_t( initial ) ) + { + assert( initial >= 0 ); + } + + ~fast_timed_semaphore() = default; + fast_timed_semaphore( const fast_timed_semaphore& ) = delete; + fast_timed_semaphore& operator=( const fast_timed_semaphore& ) = delete; + + /// Adds @p n tokens and wakes up to @p n blocked waiters. + void release( std::ptrdiff_t n = 1 ) noexcept + { + assert( n >= 0 ); + auto prev = count_.fetch_add( int32_t( n ), std::memory_order_release ); + if ( prev < 0 ) { + auto to_wake = std::min( int32_t( n ), -prev ); + if ( to_wake == 1 ) + atomic_notify_one( count_ ); + else + atomic_notify_all( count_ ); + } + } + + /// Blocks until a token is available, then consumes one. + void acquire() noexcept + { + auto prev = count_.fetch_sub( 1, std::memory_order_acquire ); + if ( prev > 0 ) + return; + + while ( true ) { + auto c = count_.load( std::memory_order_relaxed ); + if ( c >= 0 ) + return; + atomic_wait( count_, c, std::memory_order_acquire ); + } + } + + /// Consumes a token if available. Returns `true` on success, `false` if none available. + [[nodiscard]] bool try_acquire() noexcept + { + auto c = count_.load( std::memory_order_relaxed ); + while ( c > 0 ) { + if ( count_.compare_exchange_strong( c, c - 1, std::memory_order_acquire, std::memory_order_relaxed ) ) + return true; + } + return false; + } + + template < class Clock, class Duration > + [[nodiscard]] bool try_acquire_until( std::chrono::time_point< Clock, Duration > const& abs_time ) noexcept + { + if ( try_acquire() ) + return true; + + auto prev = count_.fetch_sub( 1, std::memory_order_acquire ); + if ( prev > 0 ) + return true; + + while ( true ) { + auto c = count_.load( std::memory_order_relaxed ); + if ( c >= 0 ) + return true; + + if ( !atomic_wait_until( count_, c, abs_time, std::memory_order_acquire ) ) { + // Timeout — try to undo + c = count_.load( std::memory_order_relaxed ); + if ( c >= 0 ) + return true; + auto restored = count_.fetch_add( 1, std::memory_order_relaxed ); + if ( restored >= 0 ) { + // A release happened concurrently; consume the token + // The fetch_add already undid our slot, but a release + // was targeted at us. We need to re-acquire. + return true; + } + return false; + } + } + } + + template < class Rep, class Period > + [[nodiscard]] bool try_acquire_for( std::chrono::duration< Rep, Period > const& rel_time ) noexcept + { + return try_acquire_until( std::chrono::steady_clock::now() + rel_time ); + } + +private: + std::atomic< int32_t > count_; +}; + } // namespace nova::sync diff --git a/source/nova/sync/event/timed_auto_reset_event.cpp b/source/nova/sync/event/timed_auto_reset_event.cpp index 623b4c0..4fb6f62 100644 --- a/source/nova/sync/event/timed_auto_reset_event.cpp +++ b/source/nova/sync/event/timed_auto_reset_event.cpp @@ -3,7 +3,7 @@ #include "nova/sync/event/timed_auto_reset_event.hpp" -namespace nova::sync { +namespace nova::sync::impl { void timed_auto_reset_event::wait() noexcept { @@ -52,4 +52,4 @@ bool timed_auto_reset_event::on_timed_wait_timeout() noexcept } } -} // namespace nova::sync +} // namespace nova::sync::impl diff --git a/source/nova/sync/event/timed_manual_reset_event.cpp b/source/nova/sync/event/timed_manual_reset_event.cpp index 7d984ca..d83bd20 100644 --- a/source/nova/sync/event/timed_manual_reset_event.cpp +++ b/source/nova/sync/event/timed_manual_reset_event.cpp @@ -3,7 +3,7 @@ #include "nova/sync/event/timed_manual_reset_event.hpp" -namespace nova::sync { +namespace nova::sync::impl { bool timed_manual_reset_event::on_timed_wait_timeout() noexcept { @@ -22,4 +22,4 @@ bool timed_manual_reset_event::on_timed_wait_timeout() noexcept } } -} // namespace nova::sync +} // namespace nova::sync::impl diff --git a/source/nova/sync/futex/atomic_wait.cpp b/source/nova/sync/futex/atomic_wait.cpp new file mode 100644 index 0000000..728052a --- /dev/null +++ b/source/nova/sync/futex/atomic_wait.cpp @@ -0,0 +1,433 @@ +// SPDX-License-Identifier: MIT +// SPDX-FileCopyrightText: 2026 Tim Blechmann + +#include + +#include +#include +#include +#include +#include + +// ============================================================================= +// Platform selection +// ============================================================================= +#if defined( __linux__ ) +# include +# include +# include +# include +# include +# include + +# define NOVA_SYNC_FUTEX_LINUX 1 + +#elif defined( _WIN32 ) +# include + +# define NOVA_SYNC_FUTEX_WIN32 1 + +#else +// Portable fallback: hash-table of mutex + condvar buckets. +# include +# include +# include + +# define NOVA_SYNC_FUTEX_PORTABLE 1 + +#endif + +using namespace std::chrono_literals; + +namespace nova::sync { + +// ============================================================================= +// Linux — native futex +// ============================================================================= +#if defined( NOVA_SYNC_FUTEX_LINUX ) + +namespace { + +int futex_syscall( std::atomic< int32_t >* addr, int op, int32_t val, const struct timespec* timeout, int32_t val3 ) noexcept +{ + return static_cast< int >( + ::syscall( SYS_futex, reinterpret_cast< int32_t* >( addr ), op, val, timeout, nullptr, val3 ) ); +} + +template < clockid_t ClockId > +struct timespec clock_now_plus( std::chrono::nanoseconds ns ) noexcept +{ + struct timespec ts {}; + ::clock_gettime( ClockId, &ts ); + ts.tv_sec += time_t( ns.count() / 1'000'000'000LL ); + ts.tv_nsec += long( ns.count() % 1'000'000'000LL ); + if ( ts.tv_nsec >= 1'000'000'000L ) { + ts.tv_sec += 1; + ts.tv_nsec -= 1'000'000'000L; + } + return ts; +} + +template < clockid_t ClockId > +struct timespec to_abs_timespec( std::chrono::nanoseconds ns_since_epoch ) noexcept +{ + return timespec { + .tv_sec = time_t( ns_since_epoch.count() / 1'000'000'000LL ), + .tv_nsec = long( ns_since_epoch.count() % 1'000'000'000LL ), + }; +} + +// Acquire fence must precede load for synchronization with notify's release fence. +// See [atomics.fences] p4/p8: fence(acquire) A synchronizes-with fence(release) B +// when load Y is sequenced after A and reads value stored before B. +inline bool acquire_and_check( std::atomic< int32_t >& atom, int32_t old, std::memory_order order ) noexcept +{ + if ( order != std::memory_order_relaxed ) + std::atomic_thread_fence( std::memory_order_acquire ); + return atom.load( std::memory_order_relaxed ) != old; +} + +} // namespace + +void atomic_wait( std::atomic< int32_t >& atom, int32_t old, std::memory_order order ) noexcept +{ + { + auto load_order = ( order != std::memory_order_relaxed ) ? std::memory_order_acquire : std::memory_order_relaxed; + if ( atom.load( load_order ) != old ) + return; + } + + while ( true ) { + int rc = futex_syscall( &atom, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, old, nullptr, 0 ); + + if ( rc == 0 || ( rc < 0 && errno == EAGAIN ) ) { + if ( acquire_and_check( atom, old, order ) ) + return; + continue; + } + if ( rc < 0 && errno == EINTR ) + continue; + return; + } +} + +bool atomic_wait_for( std::atomic< int32_t >& atom, + int32_t old, + std::chrono::nanoseconds rel, + std::memory_order order ) noexcept +{ + if ( rel <= 0ns ) + return atom.load( std::memory_order_relaxed ) != old; + + struct timespec abs_ts = clock_now_plus< CLOCK_MONOTONIC >( rel ); + + { + auto load_order = ( order != std::memory_order_relaxed ) ? std::memory_order_acquire : std::memory_order_relaxed; + if ( atom.load( load_order ) != old ) + return true; + } + + while ( true ) { + int rc = futex_syscall( &atom, FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG, old, &abs_ts, FUTEX_BITSET_MATCH_ANY ); + + if ( rc == 0 || ( rc < 0 && errno == EAGAIN ) ) { + if ( acquire_and_check( atom, old, order ) ) + return true; + continue; + } + if ( rc < 0 && errno == ETIMEDOUT ) + return atom.load( std::memory_order_relaxed ) != old; + if ( rc < 0 && errno == EINTR ) + continue; + return false; + } +} + +bool atomic_wait_until( std::atomic< int32_t >& atom, + int32_t old, + const std::chrono::time_point< std::chrono::steady_clock >& deadline, + std::memory_order order ) noexcept +{ + auto ns = std::chrono::duration_cast< std::chrono::nanoseconds >( deadline.time_since_epoch() ); + struct timespec abs_ts = to_abs_timespec< CLOCK_MONOTONIC >( ns ); + + { + auto load_order = ( order != std::memory_order_relaxed ) ? std::memory_order_acquire : std::memory_order_relaxed; + if ( atom.load( load_order ) != old ) + return true; + } + + while ( true ) { + int rc = futex_syscall( &atom, FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG, old, &abs_ts, FUTEX_BITSET_MATCH_ANY ); + + if ( rc == 0 || ( rc < 0 && errno == EAGAIN ) ) { + if ( acquire_and_check( atom, old, order ) ) + return true; + continue; + } + if ( rc < 0 && errno == ETIMEDOUT ) + return atom.load( std::memory_order_relaxed ) != old; + if ( rc < 0 && errno == EINTR ) + continue; + return false; + } +} + +bool atomic_wait_until( std::atomic< int32_t >& atom, + int32_t old, + const std::chrono::time_point< std::chrono::system_clock >& deadline, + std::memory_order order ) noexcept +{ + auto ns = std::chrono::duration_cast< std::chrono::nanoseconds >( deadline.time_since_epoch() ); + struct timespec abs_ts = to_abs_timespec< CLOCK_REALTIME >( ns ); + + { + auto load_order = ( order != std::memory_order_relaxed ) ? std::memory_order_acquire : std::memory_order_relaxed; + if ( atom.load( load_order ) != old ) + return true; + } + + while ( true ) { + int rc = futex_syscall( + &atom, FUTEX_WAIT_BITSET | FUTEX_CLOCK_REALTIME | FUTEX_PRIVATE_FLAG, old, &abs_ts, FUTEX_BITSET_MATCH_ANY ); + + if ( rc == 0 || ( rc < 0 && errno == EAGAIN ) ) { + if ( acquire_and_check( atom, old, order ) ) + return true; + continue; + } + if ( rc < 0 && errno == ETIMEDOUT ) + return atom.load( std::memory_order_relaxed ) != old; + if ( rc < 0 && errno == EINTR ) + continue; + return false; + } +} + +void atomic_notify_one( std::atomic< int32_t >& atom ) noexcept +{ + futex_syscall( &atom, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, 1, nullptr, 0 ); +} + +void atomic_notify_all( std::atomic< int32_t >& atom ) noexcept +{ + futex_syscall( &atom, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, INT_MAX, nullptr, 0 ); +} + +// ============================================================================= +// Windows — WaitOnAddress +// ============================================================================= +#elif defined( NOVA_SYNC_FUTEX_WIN32 ) + +void atomic_wait( std::atomic< int32_t >& atom, int32_t old, std::memory_order order ) noexcept +{ + { + auto load_order = ( order != std::memory_order_relaxed ) ? std::memory_order_acquire : std::memory_order_relaxed; + if ( atom.load( load_order ) != old ) + return; + } + + while ( true ) { + BOOL ok = ::WaitOnAddress( reinterpret_cast< volatile void* >( &atom ), &old, sizeof( int32_t ), INFINITE ); + + if ( order != std::memory_order_relaxed ) + std::atomic_thread_fence( std::memory_order_acquire ); + if ( atom.load( std::memory_order_relaxed ) != old ) + return; + + (void)ok; + } +} + +bool atomic_wait_for( std::atomic< int32_t >& atom, + int32_t old, + std::chrono::nanoseconds rel, + std::memory_order order ) noexcept +{ + if ( rel <= 0ns ) + return atom.load( std::memory_order_relaxed ) != old; + + { + auto load_order = ( order != std::memory_order_relaxed ) ? std::memory_order_acquire : std::memory_order_relaxed; + if ( atom.load( load_order ) != old ) + return true; + } + + auto ms = std::chrono::duration_cast< std::chrono::milliseconds >( rel ); + if ( ms.count() == 0 && rel.count() > 0 ) + ms = std::chrono::milliseconds( 1 ); + + DWORD timeout_ms = static_cast< DWORD >( std::min< int64_t >( ms.count(), INFINITE - 1 ) ); + + BOOL ok = ::WaitOnAddress( reinterpret_cast< volatile void* >( &atom ), &old, sizeof( int32_t ), timeout_ms ); + + if ( order != std::memory_order_relaxed ) + std::atomic_thread_fence( std::memory_order_acquire ); + + (void)ok; + return atom.load( std::memory_order_relaxed ) != old; +} + +bool atomic_wait_until( std::atomic< int32_t >& atom, + int32_t old, + const std::chrono::time_point< std::chrono::steady_clock >& deadline, + std::memory_order order ) noexcept +{ + while ( true ) { + auto remaining = deadline - std::chrono::steady_clock::now(); + if ( remaining <= 0ns ) + return atom.load( std::memory_order_relaxed ) != old; + + if ( atomic_wait_for( atom, old, remaining, order ) ) + return true; + + if ( atom.load( std::memory_order_relaxed ) != old ) + return true; + + if ( std::chrono::steady_clock::now() >= deadline ) + return false; + } +} + +bool atomic_wait_until( std::atomic< int32_t >& atom, + int32_t old, + const std::chrono::time_point< std::chrono::system_clock >& deadline, + std::memory_order order ) noexcept +{ + auto remaining = deadline - std::chrono::system_clock::now(); + return atomic_wait_for( atom, old, std::chrono::duration_cast< std::chrono::nanoseconds >( remaining ), order ); +} + +void atomic_notify_one( std::atomic< int32_t >& atom ) noexcept +{ + ::WakeByAddressSingle( reinterpret_cast< void* >( std::addressof( atom ) ) ); +} + +void atomic_notify_all( std::atomic< int32_t >& atom ) noexcept +{ + ::WakeByAddressAll( reinterpret_cast< void* >( std::addressof( atom ) ) ); +} + +// ============================================================================= +// Portable fallback — hash table of mutex + condvar buckets +// ============================================================================= +#elif defined( NOVA_SYNC_FUTEX_PORTABLE ) + +namespace { + +struct wait_bucket +{ + std::mutex mutex; + std::condition_variable cv; +}; + +constexpr std::size_t bucket_count = 67; + +wait_bucket& bucket_for( const void* addr ) noexcept +{ + static std::array< wait_bucket, bucket_count > buckets; + auto h = reinterpret_cast< std::uintptr_t >( addr ); + h ^= h >> 16; + h *= 0x9e3779b9u; + return buckets[ h % bucket_count ]; +} + +} // namespace + +void atomic_wait( std::atomic< int32_t >& atom, int32_t old, std::memory_order order ) noexcept +{ + atom.wait( old, order ); +} + +bool atomic_wait_for( std::atomic< int32_t >& atom, + int32_t old, + std::chrono::nanoseconds rel, + std::memory_order order ) noexcept +{ + if ( rel <= 0ns ) + return atom.load( std::memory_order_relaxed ) != old; + + { + auto load_order = ( order != std::memory_order_relaxed ) ? std::memory_order_acquire : std::memory_order_relaxed; + if ( atom.load( load_order ) != old ) + return true; + } + + auto& b = bucket_for( &atom ); + std::unique_lock lock( b.mutex ); + + if ( atom.load( std::memory_order_relaxed ) != old ) + return true; + + b.cv.wait_for( lock, rel, [ & ] { + return atom.load( std::memory_order_relaxed ) != old; + } ); + + return atom.load( std::memory_order_relaxed ) != old; +} + +bool atomic_wait_until( std::atomic< int32_t >& atom, + int32_t old, + const std::chrono::time_point< std::chrono::steady_clock >& deadline, + std::memory_order order ) noexcept +{ + { + auto load_order = ( order != std::memory_order_relaxed ) ? std::memory_order_acquire : std::memory_order_relaxed; + if ( atom.load( load_order ) != old ) + return true; + } + + auto& b = bucket_for( &atom ); + std::unique_lock lock( b.mutex ); + + if ( atom.load( std::memory_order_relaxed ) != old ) + return true; + + b.cv.wait_until( lock, deadline, [ & ] { + return atom.load( std::memory_order_relaxed ) != old; + } ); + + return atom.load( std::memory_order_relaxed ) != old; +} + +bool atomic_wait_until( std::atomic< int32_t >& atom, + int32_t old, + const std::chrono::time_point< std::chrono::system_clock >& deadline, + std::memory_order order ) noexcept +{ + { + auto load_order = ( order != std::memory_order_relaxed ) ? std::memory_order_acquire : std::memory_order_relaxed; + if ( atom.load( load_order ) != old ) + return true; + } + + auto& b = bucket_for( &atom ); + std::unique_lock lock( b.mutex ); + + if ( atom.load( std::memory_order_relaxed ) != old ) + return true; + + b.cv.wait_until( lock, deadline, [ & ] { + return atom.load( std::memory_order_relaxed ) != old; + } ); + + return atom.load( std::memory_order_relaxed ) != old; +} + +void atomic_notify_one( std::atomic< int32_t >& atom ) noexcept +{ + atom.notify_one(); + auto& b = bucket_for( &atom ); + b.cv.notify_one(); +} + +void atomic_notify_all( std::atomic< int32_t >& atom ) noexcept +{ + atom.notify_all(); + auto& b = bucket_for( &atom ); + b.cv.notify_all(); +} + +#endif // platform + +} // namespace nova::sync diff --git a/source/nova/sync/mutex/fair_mutex.cpp b/source/nova/sync/mutex/fair_mutex.cpp index c65a3b0..b91115d 100644 --- a/source/nova/sync/mutex/fair_mutex.cpp +++ b/source/nova/sync/mutex/fair_mutex.cpp @@ -26,7 +26,8 @@ void fair_mutex::lock_slow( uint32_t my_ticket ) noexcept if ( current_serving == my_ticket ) return; - serving_ticket_.wait( current_serving, std::memory_order_relaxed ); + atomic_wait_for( serving_ticket_, current_serving, std::chrono::hours( 24 ) ); } } + } // namespace nova::sync diff --git a/source/nova/sync/mutex/fast_mutex.cpp b/source/nova/sync/mutex/fast_mutex.cpp index 1bae255..c551cd8 100644 --- a/source/nova/sync/mutex/fast_mutex.cpp +++ b/source/nova/sync/mutex/fast_mutex.cpp @@ -29,7 +29,7 @@ void fast_mutex::lock_slow( uint32_t expected ) noexcept // Spinning failed, now we need to sleep. // To register ourselves as a waiter, we add 2, which increments the waiter count in the upper 31 bits. - state_.fetch_add( 2, std::memory_order_acquire ); + state_.fetch_add( 2, std::memory_order_relaxed ); expected = state_.load( std::memory_order_relaxed ); while ( true ) { @@ -39,7 +39,7 @@ void fast_mutex::lock_slow( uint32_t expected ) noexcept if ( state_.compare_exchange_weak( expected, desired, std::memory_order_acquire, std::memory_order_relaxed ) ) return; } else { - state_.wait( expected, std::memory_order_relaxed ); + atomic_wait_for( state_, expected, std::chrono::hours( 24 ) ); expected = state_.load( std::memory_order_relaxed ); } } diff --git a/source/nova/sync/semaphore/kqueue_semaphore.cpp b/source/nova/sync/semaphore/kqueue_semaphore.cpp index d9637c8..1c0f827 100644 --- a/source/nova/sync/semaphore/kqueue_semaphore.cpp +++ b/source/nova/sync/semaphore/kqueue_semaphore.cpp @@ -91,7 +91,7 @@ bool kqueue_semaphore::try_acquire() noexcept bool kqueue_semaphore::try_acquire_for( duration_type rel_ns ) noexcept { - if ( rel_ns.count() <= 0 ) + if ( rel_ns <= 0ns ) return try_acquire(); if ( try_acquire() ) diff --git a/test/futex_test.cpp b/test/futex_test.cpp new file mode 100644 index 0000000..d453120 --- /dev/null +++ b/test/futex_test.cpp @@ -0,0 +1,237 @@ +// SPDX-License-Identifier: MIT +// SPDX-FileCopyrightText: 2026 Tim Blechmann + +#include + +#include + +#include +#include +#include +#include + +using namespace std::chrono_literals; + +// ============================================================================= +// Basic correctness +// ============================================================================= + +TEST_CASE( "atomic_wait_for returns immediately when value differs", "[futex]" ) +{ + std::atomic< int32_t > a { 42 }; + // old != current → should return true instantly + bool ok = nova::sync::atomic_wait_for( a, 0, 100ms ); + REQUIRE( ok ); +} + +TEST_CASE( "atomic_wait_for times out when value matches", "[futex]" ) +{ + std::atomic< int32_t > a { 0 }; + auto t0 = std::chrono::steady_clock::now(); + bool ok = nova::sync::atomic_wait_for( a, 0, 30ms ); + auto elapsed = std::chrono::steady_clock::now() - t0; + + REQUIRE( !ok ); + // Should have waited at least ~30ms (allow 15ms slack for scheduling) + REQUIRE( elapsed >= 15ms ); +} + +TEST_CASE( "atomic_wait_for with zero timeout", "[futex]" ) +{ + std::atomic< int32_t > a { 0 }; + bool ok = nova::sync::atomic_wait_for( a, 0, 0ns ); + REQUIRE( !ok ); +} + +TEST_CASE( "atomic_wait_for with negative timeout", "[futex]" ) +{ + std::atomic< int32_t > a { 0 }; + bool ok = nova::sync::atomic_wait_for( a, 0, -1ms ); + REQUIRE( !ok ); +} + +// ============================================================================= +// Wakeup via notify +// ============================================================================= + +TEST_CASE( "atomic_wait_for wakes on notify_one", "[futex]" ) +{ + std::atomic< int32_t > a { 0 }; + std::latch started { 1 }; + + std::thread waiter( [ & ] { + started.count_down(); + bool ok = nova::sync::atomic_wait_for( a, 0, 2s ); + REQUIRE( ok ); + REQUIRE( a.load() != 0 ); + } ); + + started.wait(); + std::this_thread::sleep_for( 30ms ); + a.store( 1, std::memory_order_relaxed ); + nova::sync::atomic_notify_one( a ); + + waiter.join(); +} + +TEST_CASE( "atomic_wait_for wakes on notify_all", "[futex]" ) +{ + std::atomic< int32_t > a { 0 }; + const unsigned n = std::max( 2u, std::thread::hardware_concurrency() ); + std::latch started { static_cast< std::ptrdiff_t >( n ) }; + std::atomic< int > woken { 0 }; + + std::vector< std::thread > threads; + threads.reserve( n ); + + for ( unsigned i = 0; i < n; ++i ) { + threads.emplace_back( [ & ] { + started.count_down(); + bool ok = nova::sync::atomic_wait_for( a, 0, 2s ); + if ( ok ) + ++woken; + } ); + } + + started.wait(); + std::this_thread::sleep_for( 30ms ); + a.store( 1, std::memory_order_relaxed ); + nova::sync::atomic_notify_all( a ); + + for ( auto& t : threads ) + t.join(); + + REQUIRE( woken.load() == static_cast< int >( n ) ); +} + +// ============================================================================= +// Absolute deadline overloads +// ============================================================================= + +TEST_CASE( "atomic_wait_until (steady_clock) times out", "[futex]" ) +{ + std::atomic< int32_t > a { 0 }; + auto deadline = std::chrono::steady_clock::now() + 30ms; + + bool ok = nova::sync::atomic_wait_until( a, 0, deadline ); + REQUIRE( !ok ); + REQUIRE( std::chrono::steady_clock::now() >= deadline - 5ms ); +} + +TEST_CASE( "atomic_wait_until (steady_clock) wakes on notify", "[futex]" ) +{ + std::atomic< int32_t > a { 0 }; + std::latch started { 1 }; + + std::thread waiter( [ & ] { + started.count_down(); + auto deadline = std::chrono::steady_clock::now() + 2s; + bool ok = nova::sync::atomic_wait_until( a, 0, deadline ); + REQUIRE( ok ); + } ); + + started.wait(); + std::this_thread::sleep_for( 30ms ); + a.store( 1, std::memory_order_relaxed ); + nova::sync::atomic_notify_one( a ); + + waiter.join(); +} + +TEST_CASE( "atomic_wait_until (system_clock) times out", "[futex]" ) +{ + std::atomic< int32_t > a { 0 }; + auto deadline = std::chrono::system_clock::now() + 30ms; + + bool ok = nova::sync::atomic_wait_until( a, 0, deadline ); + REQUIRE( !ok ); +} + +TEST_CASE( "atomic_wait_until (system_clock) wakes on notify", "[futex]" ) +{ + std::atomic< int32_t > a { 0 }; + std::latch started { 1 }; + + std::thread waiter( [ & ] { + started.count_down(); + auto deadline = std::chrono::system_clock::now() + 2s; + bool ok = nova::sync::atomic_wait_until( a, 0, deadline ); + REQUIRE( ok ); + } ); + + started.wait(); + std::this_thread::sleep_for( 30ms ); + a.store( 1, std::memory_order_relaxed ); + nova::sync::atomic_notify_one( a ); + + waiter.join(); +} + +// ============================================================================= +// Stress: multiple waiters with different atomics (no false sharing of buckets) +// ============================================================================= + +TEST_CASE( "atomic_wait_for stress - independent atomics", "[futex][stress]" ) +{ + constexpr int iterations = 200; + std::atomic< int32_t > a { 0 }; + std::atomic< int32_t > b { 0 }; + + std::thread ta( [ & ] { + for ( int i = 0; i < iterations; ++i ) { + while ( a.load( std::memory_order_relaxed ) == i ) + nova::sync::atomic_wait_for( a, i, 100ms ); + } + } ); + + std::thread tb( [ & ] { + for ( int i = 0; i < iterations; ++i ) { + while ( b.load( std::memory_order_relaxed ) == i ) + nova::sync::atomic_wait_for( b, i, 100ms ); + } + } ); + + for ( int i = 1; i <= iterations; ++i ) { + a.store( i, std::memory_order_relaxed ); + nova::sync::atomic_notify_one( a ); + b.store( i, std::memory_order_relaxed ); + nova::sync::atomic_notify_one( b ); + } + + ta.join(); + tb.join(); + + REQUIRE( a.load() == iterations ); + REQUIRE( b.load() == iterations ); +} + +// ============================================================================= +// Ping-pong between two threads +// ============================================================================= + +TEST_CASE( "atomic_wait_for ping-pong", "[futex][stress]" ) +{ + constexpr int rounds = 500; + std::atomic< int32_t > turn { 0 }; // 0 = thread A's turn, 1 = thread B's turn + + std::thread a( [ & ] { + for ( int i = 0; i < rounds; ++i ) { + while ( turn.load( std::memory_order_relaxed ) != 0 ) + nova::sync::atomic_wait_for( turn, 1, 100ms ); + turn.store( 1, std::memory_order_relaxed ); + nova::sync::atomic_notify_one( turn ); + } + } ); + + std::thread b( [ & ] { + for ( int i = 0; i < rounds; ++i ) { + while ( turn.load( std::memory_order_relaxed ) != 1 ) + nova::sync::atomic_wait_for( turn, 0, 100ms ); + turn.store( 0, std::memory_order_relaxed ); + nova::sync::atomic_notify_one( turn ); + } + } ); + + a.join(); + b.join(); +} diff --git a/test/mutex_test.cpp b/test/mutex_test.cpp index a80989a..1f669bd 100644 --- a/test/mutex_test.cpp +++ b/test/mutex_test.cpp @@ -380,6 +380,9 @@ TEST_CASE( "mutex: fair_mutex FIFO ordering", "[mutex]" ) #ifdef NOVA_SYNC_TIMED_MUTEX_TYPES +static_assert( nova::sync::concepts::timed_mutex< nova::sync::fast_mutex > ); +static_assert( nova::sync::concepts::timed_mutex< nova::sync::fair_mutex > ); + TEMPLATE_TEST_CASE( "mutex: timed locking", "[mutex]", NOVA_SYNC_TIMED_MUTEX_TYPES ) { using mutex_t = TestType; diff --git a/test/mutex_types.hpp b/test/mutex_types.hpp index 483c812..e13ac18 100644 --- a/test/mutex_types.hpp +++ b/test/mutex_types.hpp @@ -135,21 +135,33 @@ NOVA_SYNC_HAS_PTHREAD_SPINLOCK_arg \ // clang-format off #ifdef NOVA_SYNC_HAS_KQUEUE_MUTEX # define NOVA_SYNC_TIMED_MUTEX_TYPES \ + nova::sync::fast_mutex, \ + nova::sync::fair_mutex, \ nova::sync::kqueue_mutex, \ nova::sync::fast_kqueue_mutex \ NOVA_SYNC_HAS_PTHREAD_RT_MUTEX_arg #elif defined( NOVA_SYNC_HAS_EVENTFD_MUTEX ) # define NOVA_SYNC_TIMED_MUTEX_TYPES \ + nova::sync::fast_mutex, \ + nova::sync::fair_mutex, \ nova::sync::eventfd_mutex, \ nova::sync::fast_eventfd_mutex \ NOVA_SYNC_HAS_PTHREAD_RT_MUTEX_arg #elif defined( _WIN32 ) # define NOVA_SYNC_TIMED_MUTEX_TYPES \ + nova::sync::fast_mutex, \ + nova::sync::fair_mutex, \ nova::sync::win32_event_mutex, \ nova::sync::win32_mutex #elif defined( NOVA_SYNC_HAS_PTHREAD_RT_MUTEX ) # define NOVA_SYNC_TIMED_MUTEX_TYPES \ + nova::sync::fast_mutex, \ + nova::sync::fair_mutex, \ nova::sync::pthread_priority_inherit_mutex +#else +# define NOVA_SYNC_TIMED_MUTEX_TYPES \ + nova::sync::fast_mutex, \ + nova::sync::fair_mutex #endif // clang-format on diff --git a/test/semaphore_test.cpp b/test/semaphore_test.cpp index 576f659..01a48e4 100644 --- a/test/semaphore_test.cpp +++ b/test/semaphore_test.cpp @@ -10,7 +10,6 @@ #include #include -#include #include #include @@ -18,6 +17,7 @@ using namespace std::chrono_literals; // Validate concepts for portable types. static_assert( nova::sync::concepts::counting_semaphore< nova::sync::fast_semaphore > ); +static_assert( nova::sync::concepts::timed_counting_semaphore< nova::sync::fast_timed_semaphore > ); // ============================================================================= // Shared helpers