Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ set(headers
include/nova/sync/detail/pause.hpp
include/nova/sync/detail/syscall.hpp
include/nova/sync/detail/timed_wait.hpp

include/nova/sync/futex/atomic_wait.hpp
include/nova/sync/event/auto_reset_event.hpp
include/nova/sync/event/concepts.hpp
include/nova/sync/event/detail/async_support.hpp
Expand Down Expand Up @@ -86,6 +86,7 @@ set(headers
set(sources
source/nova/sync/detail/syscall.cpp
source/nova/sync/detail/timed_wait.cpp
source/nova/sync/futex/atomic_wait.cpp
source/nova/sync/event/native_auto_reset_event.cpp
source/nova/sync/event/native_manual_reset_event.cpp
source/nova/sync/event/timed_auto_reset_event.cpp
Expand Down Expand Up @@ -132,6 +133,11 @@ if(WIN32)
WIN32_LEAN_AND_MEAN
_WIN32_WINNT=0x0a00
)

target_link_libraries(nova_sync PRIVATE
Synchronization.lib
Kernel32.lib
)
endif()

# Testing
Expand Down Expand Up @@ -239,6 +245,7 @@ if(NOVA_SYNC_TESTS)
test/event_test.cpp
test/event_benchmarks.cpp
test/event_async_asio_test.cpp
test/futex_test.cpp
test/semaphore_types.hpp
test/semaphore_test.cpp
test/semaphore_benchmarks.cpp
Expand Down Expand Up @@ -294,7 +301,7 @@ if(NOVA_SYNC_TESTS)

if (NOT NOVA_SYNC_TESTS_STRESS_TEST)
catch_discover_tests(nova_sync_tests
TEST_FILTER ~[stress]
TEST_SPEC "~[stress]"
PROPERTIES
TIMEOUT 20
)
Expand Down
41 changes: 38 additions & 3 deletions include/nova/sync/event/auto_reset_event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include <nova/sync/detail/backoff.hpp>
#include <nova/sync/detail/compat.hpp>
#include <nova/sync/futex/atomic_wait.hpp>

namespace nova::sync {

Expand Down Expand Up @@ -42,7 +43,7 @@ class auto_reset_event

if ( state_.compare_exchange_weak( s, s + 1, std::memory_order_release, std::memory_order_relaxed ) ) {
if ( s < 0 )
state_.notify_all();
atomic_notify_all( state_ );
return;
}
}
Expand Down Expand Up @@ -76,11 +77,45 @@ class auto_reset_event
int32_t cur = state_.load( std::memory_order_relaxed );

while ( cur <= my_slot ) {
state_.wait( cur, std::memory_order_relaxed );
atomic_wait( state_, cur, std::memory_order_acquire );
cur = state_.load( std::memory_order_relaxed );
}
}

template < class Clock, class Duration >
[[nodiscard]] bool try_wait_until( std::chrono::time_point< Clock, Duration > const& abs_time ) noexcept
{
if ( try_wait() )
return true;

int32_t prev = state_.fetch_sub( 1, std::memory_order_acquire );
if ( prev > 0 )
return true;

int32_t my_slot = prev - 1;
int32_t cur = state_.load( std::memory_order_relaxed );

std::atomic_thread_fence( std::memory_order_acquire );
while ( cur <= my_slot ) {
if ( !atomic_wait_until( state_, cur, abs_time, std::memory_order_acquire ) ) {
// Timeout — but check one more time
cur = state_.load( std::memory_order_relaxed );
if ( cur > my_slot ) {
return true;
}
// Must undo our wait registration: add 1 back
state_.fetch_add( 1, std::memory_order_relaxed );
return false;
}
cur = state_.load( std::memory_order_relaxed );
}

return true;
}

template < class Rep, class Period >
[[nodiscard]] bool try_wait_for( std::chrono::duration< Rep, Period > const& rel_time ) noexcept
{
return try_wait_until( std::chrono::steady_clock::now() + rel_time );
}

private:
Expand Down
29 changes: 25 additions & 4 deletions include/nova/sync/event/manual_reset_event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
#pragma once

#include <atomic>
#include <chrono>

#include <nova/sync/detail/compat.hpp>
#include <nova/sync/futex/atomic_wait.hpp>

namespace nova::sync {

Expand All @@ -30,13 +32,13 @@ class manual_reset_event
void signal() noexcept
{
if ( state_.exchange( 1u, std::memory_order_release ) == 0u )
state_.notify_all();
atomic_notify_all( state_ );
}

/// @brief Transitions the event back to "not set".
void reset() noexcept
{
state_.store( 0u, std::memory_order_release );
state_.store( 0u, std::memory_order_relaxed );
}

/// @brief Returns true if the event is currently set, without blocking.
Expand All @@ -55,9 +57,28 @@ class manual_reset_event
// Park: block until the value is no longer 0.
// Spurious wakeups are handled by the loop.
while ( state_.load( std::memory_order_relaxed ) == 0u )
state_.wait( 0u, std::memory_order_relaxed );
atomic_wait( state_, 0u, std::memory_order_acquire );
}

template < class Clock, class Duration >
[[nodiscard]] bool try_wait_until( std::chrono::time_point< Clock, Duration > const& abs_time ) noexcept
{
if ( state_.load( std::memory_order_acquire ) != 0 )
return true;

while ( state_.load( std::memory_order_relaxed ) == 0 ) {
if ( !atomic_wait_until( state_, 0u, abs_time, std::memory_order_acquire ) ) {
return state_.load( std::memory_order_acquire ) != 0;
}
}

std::atomic_thread_fence( std::memory_order_acquire );
return true;
}

template < class Rep, class Period >
[[nodiscard]] bool try_wait_for( std::chrono::duration< Rep, Period > const& rel_time ) noexcept
{
return try_wait_until( std::chrono::steady_clock::now() + rel_time );
}

private:
Expand Down
18 changes: 18 additions & 0 deletions include/nova/sync/event/timed_auto_reset_event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@

#include <nova/sync/detail/compat.hpp>

#if defined( __linux__ ) || defined( _WIN32 )
# include "nova/sync/event/auto_reset_event.hpp"
#endif

namespace nova::sync {
namespace impl {

/// @brief Auto-reset event with timed-wait support.
///
Expand Down Expand Up @@ -125,4 +130,17 @@ class timed_auto_reset_event
std::counting_semaphore< max_waiters > sem_ { 0 };
};

} // namespace impl

#if defined( __linux__ ) || defined( _WIN32 )

using timed_auto_reset_event = auto_reset_event;

#else

using timed_auto_reset_event = impl::timed_auto_reset_event;

#endif


} // namespace nova::sync
21 changes: 21 additions & 0 deletions include/nova/sync/event/timed_manual_reset_event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,21 @@

#include <nova/sync/detail/compat.hpp>

#if defined( __linux__ ) || defined( _WIN32 )
# include "nova/sync/event/manual_reset_event.hpp"
#else

#endif

namespace nova::sync {

/// @brief Manual-reset event with timed-wait support.
///
/// Once `signal()` is called, all waiters are woken and subsequent `wait()` /
/// `try_wait()` calls return immediately until `reset()` is called.

namespace impl {

class timed_manual_reset_event
{
// -----------------------------------------------------------------------
Expand Down Expand Up @@ -140,4 +148,17 @@ class timed_manual_reset_event
std::counting_semaphore< max_waiters > sem_ { 0 };
};

} // namespace impl

#if defined( __linux__ ) || defined( _WIN32 )

using timed_manual_reset_event = manual_reset_event;

#else

using timed_manual_reset_event = impl::timed_manual_reset_event;

#endif


} // namespace nova::sync
120 changes: 120 additions & 0 deletions include/nova/sync/futex/atomic_wait.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2026 Tim Blechmann

#pragma once

#include <atomic>
#include <bit>
#include <chrono>
#include <cstdint>

namespace nova::sync {

// =============================================================================
// int32_t overloads — primary implementation
// =============================================================================

/// @brief Blocks until the value of @p atom differs from @p old.
///
/// Spurious wakeups are possible; callers must re-check in a loop.
///
/// @param atom The atomic to monitor.
/// @param old Expected value; if already different, returns immediately.
/// @param order Memory ordering (`acquire`, `relaxed`, or `seq_cst`).
void atomic_wait( std::atomic< int32_t >& atom,
int32_t old,
std::memory_order order = std::memory_order_seq_cst ) noexcept;

/// @brief Blocks until the value differs from @p old or @p rel expires.
///
/// Spurious wakeups are possible; callers must re-check in a loop.
///
/// @param atom The atomic to monitor.
/// @param old Expected value; if already different, returns immediately.
/// @param rel Relative timeout duration.
/// @param order Memory ordering (`acquire`, `relaxed`, or `seq_cst`).
/// @return `true` if woken (value changed or spurious), `false` on timeout.
bool atomic_wait_for( std::atomic< int32_t >& atom,
int32_t old,
std::chrono::nanoseconds rel,
std::memory_order order = std::memory_order_seq_cst ) noexcept;

/// @brief Blocks until the value differs from @p old or @p deadline is reached.
/// @param atom The atomic to monitor.
/// @param old Expected value.
/// @param deadline Absolute time point.
/// @param order Memory ordering.
/// @return `true` if woken, `false` on timeout.
bool atomic_wait_until( std::atomic< int32_t >& atom,
int32_t old,
const std::chrono::time_point< std::chrono::steady_clock >& deadline,
std::memory_order order = std::memory_order_seq_cst ) noexcept;

/// @brief Blocks until the value differs from @p old or @p deadline is reached.
/// @param atom The atomic to monitor.
/// @param old Expected value.
/// @param deadline Absolute time point (system clock / wall time).
/// @param order Memory ordering.
/// @return `true` if woken, `false` on timeout.
bool atomic_wait_until( std::atomic< int32_t >& atom,
int32_t old,
const std::chrono::time_point< std::chrono::system_clock >& deadline,
std::memory_order order = std::memory_order_seq_cst ) noexcept;

/// @brief Wakes one thread waiting on this atomic.
/// @param order Memory ordering (`release` or `seq_cst`).
void atomic_notify_one( std::atomic< int32_t >& atom ) noexcept;

/// @brief Wakes all threads waiting on this atomic.
/// @param order Memory ordering (`release` or `seq_cst`).
void atomic_notify_all( std::atomic< int32_t >& atom ) noexcept;

// =============================================================================
// uint32_t overloads — thin wrappers over the int32_t core
//
// Futex operates on 32-bit words regardless of signedness. These overloads
// reinterpret the uint32_t atomic as int32_t so callers with unsigned state
// (e.g. fast_mutex, fair_mutex, manual_reset_event) can use the same API
// without manual casts.
// =============================================================================

inline void
atomic_wait( std::atomic< uint32_t >& atom, uint32_t old, std::memory_order order = std::memory_order_seq_cst ) noexcept
{
atomic_wait( reinterpret_cast< std::atomic< int32_t >& >( atom ), std::bit_cast< int32_t >( old ), order );
}

template < class Clock, class Duration >
inline bool atomic_wait_until( std::atomic< uint32_t >& atom,
uint32_t old,
const std::chrono::time_point< Clock, Duration >& deadline,
std::memory_order order = std::memory_order_seq_cst ) noexcept
{
return atomic_wait_until( reinterpret_cast< std::atomic< int32_t >& >( atom ),
std::bit_cast< int32_t >( old ),
deadline,
order );
}

inline bool atomic_wait_for( std::atomic< uint32_t >& atom,
uint32_t old,
std::chrono::nanoseconds rel,
std::memory_order order = std::memory_order_seq_cst ) noexcept
{
return atomic_wait_for( reinterpret_cast< std::atomic< int32_t >& >( atom ),
std::bit_cast< int32_t >( old ),
rel,
order );
}

inline void atomic_notify_one( std::atomic< uint32_t >& atom ) noexcept
{
atomic_notify_one( reinterpret_cast< std::atomic< int32_t >& >( atom ) );
}

inline void atomic_notify_all( std::atomic< uint32_t >& atom ) noexcept
{
atomic_notify_all( reinterpret_cast< std::atomic< int32_t >& >( atom ) );
}

} // namespace nova::sync
2 changes: 1 addition & 1 deletion include/nova/sync/mutex/eventfd_mutex.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class NOVA_SYNC_CAPABILITY( "mutex" ) eventfd_mutex
/// @return `true` if the lock was acquired, `false` if the duration expired.
bool try_lock_for( duration_type rel_ns ) noexcept NOVA_SYNC_TRY_ACQUIRE( true )
{
if ( rel_ns.count() <= 0 )
if ( rel_ns <= std::chrono::nanoseconds::zero() )
return try_lock();
while ( !try_lock() ) {
if ( !detail::ppoll_for( evfd_, rel_ns ) )
Expand Down
Loading