Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ set(headers
include/nova/sync/detail/async_support.hpp
include/nova/sync/detail/backoff.hpp
include/nova/sync/detail/compat.hpp
include/nova/sync/detail/native_handle_support.hpp
include/nova/sync/detail/pause.hpp
include/nova/sync/detail/syscall.hpp
include/nova/sync/detail/timed_wait.hpp
Expand Down Expand Up @@ -216,8 +217,8 @@ if(NOVA_SYNC_TESTS)
)
endif()

if ("CMAKE_CXX_COMPILER_ID MATCHES Clang OR CMAKE_CXX_COMPILER_ID MATCHES AppleClang")
add_compile_options(nova_sync PRIVATE -Wthread-safety)
if(CMAKE_CXX_COMPILER_ID MATCHES Clang OR CMAKE_CXX_COMPILER_ID MATCHES AppleClang)
target_compile_options(nova_sync PRIVATE -Wthread-safety)
endif()

add_executable(nova_sync_tests
Expand Down
52 changes: 39 additions & 13 deletions include/nova/sync/detail/syscall.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#pragma once

#include <chrono>
#include <span>

#if __has_include( <unistd.h> ) && __has_include( <poll.h> ) && __has_include( <errno.h> )
# include <cstddef>
Expand All @@ -15,20 +16,20 @@
namespace nova::sync::detail {

// Retry a POSIX read() across EINTR. Returns the same return value as read().
inline ssize_t read_intr( int fd, void* buf, size_t count ) noexcept
inline ssize_t read_intr( int fd, std::span< std::byte > buf ) noexcept
{
ssize_t rc;
do {
rc = ::read( fd, buf, count );
rc = ::read( fd, buf.data(), buf.size() );
} while ( rc < 0 && errno == EINTR );
return rc;
}

inline ssize_t write_intr( int fd, const void* buf, size_t count ) noexcept
inline ssize_t write_intr( int fd, std::span< const std::byte > buf ) noexcept
{
const unsigned char* p = static_cast< const unsigned char* >( buf );
size_t remaining = count;
size_t offset = 0;
const std::byte* p = buf.data();
size_t remaining = buf.size();
size_t offset = 0;

while ( remaining > 0 ) {
ssize_t rc = ::write( fd, p + offset, remaining );
Expand All @@ -42,25 +43,45 @@ inline ssize_t write_intr( int fd, const void* buf, size_t count ) noexcept
offset += rc;
}

return ssize_t( count );
return ssize_t( buf.size() );
}
inline int poll_intr( struct pollfd* fds, nfds_t nfds ) noexcept

inline int poll_intr( std::span< struct pollfd > fds ) noexcept
{
int rc;
do {
rc = ::poll( fds, nfds, -1 );
rc = ::poll( fds.data(), nfds_t( fds.size() ), -1 );
} while ( rc < 0 && errno == EINTR );
return rc;
}

inline int poll_intr( struct pollfd* fds, nfds_t nfds, std::chrono::milliseconds timeout ) noexcept
inline int poll_intr( struct pollfd& fd ) noexcept
{
return poll_intr( std::span< struct pollfd >( &fd, 1 ) );
}


inline int try_poll( std::span< struct pollfd > fds ) noexcept
{
using clock = std::chrono::steady_clock;
auto start = clock::now();
return ::poll( fds.data(), nfds_t( fds.size() ), 0 );
}

inline int try_poll( struct pollfd& fd ) noexcept
{
return try_poll( std::span< struct pollfd >( &fd, 1 ) );
}

inline int poll_intr( std::span< struct pollfd > fds, std::chrono::milliseconds timeout ) noexcept
{
if ( timeout <= std::chrono::milliseconds::zero() )
return try_poll( fds );

using clock = std::chrono::steady_clock;
const auto start = clock::now();

while ( true ) {
int rem = int( std::chrono::duration_cast< std::chrono::milliseconds >( timeout ).count() );
int rc = ::poll( fds, nfds, rem );
int rc = ::poll( fds.data(), nfds_t( fds.size() ), rem );
if ( rc < 0 ) {
if ( errno == EINTR ) {
auto elapsed = clock::now() - start;
Expand All @@ -74,6 +95,11 @@ inline int poll_intr( struct pollfd* fds, nfds_t nfds, std::chrono::milliseconds
}
}

inline int poll_intr( struct pollfd& fd, std::chrono::milliseconds timeout ) noexcept
{
return poll_intr( std::span< struct pollfd >( &fd, 1 ), timeout );
}

} // namespace nova::sync::detail

#endif
Expand Down
4 changes: 2 additions & 2 deletions include/nova/sync/event/support/libdispatch_support.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ struct dispatch_event_cancel_state

void cancel() noexcept
{
cancelled.store( true, std::memory_order_release );
cancelled.store( true, std::memory_order_seq_cst );
if ( cancellation_callback_ )
cancellation_callback_();
}

bool is_cancelled() const noexcept
{
return cancelled.load( std::memory_order_acquire );
return cancelled.load( std::memory_order_seq_cst );
}

bool try_mark_invoked() noexcept
Expand Down
4 changes: 2 additions & 2 deletions include/nova/sync/mutex/support/libdispatch_support.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,13 @@ struct dispatch_cancel_state

void cancel() noexcept
{
cancelled.store( true, std::memory_order_release );
cancelled.store( true, std::memory_order_seq_cst );
if ( cancellation_callback_ )
cancellation_callback_();
}
bool is_cancelled() const noexcept
{
return cancelled.load( std::memory_order_acquire );
return cancelled.load( std::memory_order_seq_cst );
}
bool try_mark_invoked() noexcept
{
Expand Down
2 changes: 2 additions & 0 deletions include/nova/sync/semaphore/detail/async_support.hpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#pragma once

/// Semaphore async handlers invoke with `expected<void, error_code>`.
/// Semaphore acquire completes synchronously; no lock guard returned (cf. events).
///
Expand Down
4 changes: 2 additions & 2 deletions include/nova/sync/semaphore/support/libdispatch_support.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,14 @@ struct dispatch_semaphore_cancel_state

void cancel() noexcept
{
cancelled.store( true, std::memory_order_release );
cancelled.store( true, std::memory_order_seq_cst );
if ( cancellation_callback_ )
cancellation_callback_();
}

bool is_cancelled() const noexcept
{
return cancelled.load( std::memory_order_acquire );
return cancelled.load( std::memory_order_seq_cst );
}

bool try_mark_invoked() noexcept
Expand Down
24 changes: 12 additions & 12 deletions source/nova/sync/event/native_auto_reset_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ native_auto_reset_event::native_auto_reset_event( bool initially_set ) noexcept
#elif defined( __linux__ )
handle_.reset( ::eventfd( 0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE ) );
if ( initially_set ) {
uint64_t val = 1;
detail::write_intr( handle_.get(), &val, sizeof( val ) );
const std::array< uint64_t, 1 > val { 1 };
detail::write_intr( handle_.get(), std::as_bytes( std::span( val ) ) );
}
#elif defined( __APPLE__ )
handle_.reset( ::kqueue() );
Expand Down Expand Up @@ -88,12 +88,12 @@ void native_auto_reset_event::signal() noexcept
// Only apply idempotency check if no threads are waiting
if ( wait_count_.load( std::memory_order_acquire ) == 0 ) {
struct pollfd pfd = { handle_.get(), POLLIN, 0 };
if ( detail::poll_intr( &pfd, 1, 0ms ) > 0 ) {
if ( detail::try_poll( pfd ) > 0 ) {
return; // Already set, don't write
}
}
uint64_t val = 1;
detail::write_intr( handle_.get(), &val, sizeof( val ) );
std::array< uint64_t, 1 > val { 1 };
detail::write_intr( handle_.get(), std::as_bytes( std::span( val ) ) );
#elif defined( __APPLE__ )
// Strategy:
// • token_count_ tracks unconsumed tokens.
Expand Down Expand Up @@ -133,7 +133,7 @@ void native_auto_reset_event::signal() noexcept
// trigger once a token is consumed.
#else
uint8_t dummy = 1;
detail::write_intr( fds_[ 1 ].get(), &dummy, 1 );
detail::write_intr( fds_[ 1 ].get(), std::as_bytes( std::span( &dummy, 1 ) ) );
#endif
}

Expand All @@ -142,8 +142,8 @@ bool native_auto_reset_event::try_wait() noexcept
#if defined( _WIN32 )
return ::WaitForSingleObject( handle_.get(), 0 ) == WAIT_OBJECT_0;
#elif defined( __linux__ )
uint64_t val;
return detail::read_intr( handle_.get(), &val, sizeof( val ) ) == ssize_t( sizeof( val ) );
std::array< uint64_t, 1 > val {};
return detail::read_intr( handle_.get(), as_writable_bytes( std::span( val ) ) ) == sizeof( uint64_t );
#elif defined( __APPLE__ )
// Attempt to consume one token via CAS.
int s = token_count_.load( std::memory_order_acquire );
Expand All @@ -155,7 +155,7 @@ bool native_auto_reset_event::try_wait() noexcept
#else
uint8_t buf[ 128 ];
bool signaled = false;
while ( detail::read_intr( fds_[ 0 ].get(), buf, sizeof( buf ) ) > 0 )
while ( detail::read_intr( fds_[ 0 ].get(), std::as_writable_bytes( std::span( buf ) ) ) > 0 )
signaled = true;
return signaled;
#endif
Expand All @@ -169,7 +169,7 @@ void native_auto_reset_event::wait() noexcept
wait_count_.fetch_add( 1, std::memory_order_acquire );
while ( !try_wait() ) {
struct pollfd pfd = { handle_.get(), POLLIN, 0 };
detail::poll_intr( &pfd, 1 );
detail::poll_intr( pfd );
}
wait_count_.fetch_sub( 1, std::memory_order_release );
#elif defined( __APPLE__ )
Expand Down Expand Up @@ -206,7 +206,7 @@ void native_auto_reset_event::wait() noexcept
#else
while ( !try_wait() ) {
struct pollfd pfd = { native_handle(), POLLIN, 0 };
detail::poll_intr( &pfd, 1 );
detail::poll_intr( pfd );
}
#endif
}
Expand Down Expand Up @@ -255,7 +255,7 @@ bool native_auto_reset_event::try_wait_for( duration_type timeout ) noexcept
return result;
# else
struct pollfd pfd = { native_handle(), POLLIN, 0 };
detail::poll_intr( &pfd, 1, timeout );
detail::poll_intr( pfd, timeout );
return try_wait();
# endif
#endif
Expand Down
24 changes: 13 additions & 11 deletions source/nova/sync/event/native_manual_reset_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#if defined( _WIN32 )
# include <windows.h>
#else
# include <array>
# include <fcntl.h>
# include <poll.h>
# include <unistd.h>
Expand Down Expand Up @@ -67,19 +68,19 @@ void native_manual_reset_event::signal() noexcept
::SetEvent( handle_.get() );
#elif defined( __linux__ )
struct pollfd pfd = { handle_.get(), POLLIN, 0 };
if ( detail::poll_intr( &pfd, 1, 0ms ) == 0 ) {
uint64_t val = 1;
detail::write_intr( handle_.get(), &val, sizeof( val ) );
if ( detail::try_poll( pfd ) == 0 ) {
const std::array< uint64_t, 1 > val { 1 };
detail::write_intr( handle_.get(), std::as_bytes( std::span( val ) ) );
}
#elif defined( __APPLE__ )
struct kevent ev;
EV_SET( &ev, 1, EVFILT_USER, 0, NOTE_TRIGGER, 0, nullptr );
::kevent( handle_.get(), &ev, 1, nullptr, 0, nullptr );
#else
struct pollfd pfd = { fds_[ 0 ].get(), POLLIN, 0 };
if ( detail::poll_intr( &pfd, 1, 0ms ) == 0 ) {
if ( detail::try_poll( pfd ) == 0 ) {
uint8_t dummy = 1;
detail::write_intr( fds_[ 1 ].get(), &dummy, 1 );
detail::write_intr( fds_[ 1 ].get(), std::as_bytes( std::span( &dummy, 1 ) ) );
}
#endif
}
Expand All @@ -89,8 +90,9 @@ void native_manual_reset_event::reset() noexcept
#if defined( _WIN32 )
::ResetEvent( handle_.get() );
#elif defined( __linux__ )
uint64_t val;
while ( detail::read_intr( handle_.get(), &val, sizeof( val ) ) == ssize_t( sizeof( val ) ) ) {}
std::array< uint64_t, 1 > val {};
while ( detail::read_intr( handle_.get(), std::as_writable_bytes( std::span( val ) ) )
== ssize_t( sizeof( uint64_t ) ) ) {}
#elif defined( __APPLE__ )
struct kevent ev;
EV_SET( &ev, 1, EVFILT_USER, EV_DELETE, 0, 0, nullptr );
Expand All @@ -99,7 +101,7 @@ void native_manual_reset_event::reset() noexcept
::kevent( handle_.get(), &ev, 1, nullptr, 0, nullptr );
#else
uint8_t buf[ 128 ];
while ( detail::read_intr( fds_[ 0 ].get(), buf, sizeof( buf ) ) > 0 ) {}
while ( detail::read_intr( fds_[ 0 ].get(), std::as_writable_bytes( std::span( buf ) ) ) > 0 ) {}
#endif
}

Expand All @@ -109,7 +111,7 @@ bool native_manual_reset_event::try_wait() const noexcept
return ::WaitForSingleObject( handle_.get(), 0 ) == WAIT_OBJECT_0;
#else
struct pollfd pfd = { native_handle(), POLLIN, 0 };
int rc = detail::poll_intr( &pfd, 1, 0ms );
int rc = detail::try_poll( pfd );
return rc > 0;
#endif
}
Expand All @@ -121,7 +123,7 @@ void native_manual_reset_event::wait() const noexcept
#else
while ( !try_wait() ) {
struct pollfd pfd = { native_handle(), POLLIN, 0 };
detail::poll_intr( &pfd, 1 );
detail::poll_intr( pfd );
}
#endif
}
Expand All @@ -144,7 +146,7 @@ bool native_manual_reset_event::try_wait_for( duration_type timeout ) const noex
return false;
# else
struct pollfd pfd = { native_handle(), POLLIN, 0 };
int rc = detail::poll_intr( &pfd, 1, timeout );
int rc = detail::poll_intr( pfd, timeout );
if ( rc <= 0 )
return try_wait();
return try_wait();
Expand Down
12 changes: 10 additions & 2 deletions source/nova/sync/futex/atomic_wait.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ bool atomic_wait_for( std::atomic< int32_t >& atom,
return atom.load( std::memory_order_relaxed ) != old;
} );

if ( order != std::memory_order_relaxed )
std::atomic_thread_fence( std::memory_order_acquire );
return atom.load( std::memory_order_relaxed ) != old;
}

Expand All @@ -387,6 +389,8 @@ bool atomic_wait_until( std::atomic< int32_t >&
return atom.load( std::memory_order_relaxed ) != old;
} );

if ( order != std::memory_order_relaxed )
std::atomic_thread_fence( std::memory_order_acquire );
return atom.load( std::memory_order_relaxed ) != old;
}

Expand All @@ -411,20 +415,24 @@ bool atomic_wait_until( std::atomic< int32_t >&
return atom.load( std::memory_order_relaxed ) != old;
} );

if ( order != std::memory_order_relaxed )
std::atomic_thread_fence( std::memory_order_acquire );
return atom.load( std::memory_order_relaxed ) != old;
}

void atomic_notify_one( std::atomic< int32_t >& atom ) noexcept
{
atom.notify_one();
auto& b = bucket_for( &atom );
auto& b = bucket_for( &atom );
std::lock_guard lock( b.mutex );
b.cv.notify_one();
}

void atomic_notify_all( std::atomic< int32_t >& atom ) noexcept
{
atom.notify_all();
auto& b = bucket_for( &atom );
auto& b = bucket_for( &atom );
std::lock_guard lock( b.mutex );
b.cv.notify_all();
}

Expand Down
Loading