-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathEventDispatcher.cppm
More file actions
120 lines (101 loc) · 4.41 KB
/
EventDispatcher.cppm
File metadata and controls
120 lines (101 loc) · 4.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
// Copyright (C) 2026 mxreal64
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://gnu.org>.
export module EventDispatcher;
import <cstdint>;
import <cstddef>;
import <atomic>;
import <new>;
import <utility>;
export template <typename EventType>
struct alignas(64) EventSlot {
// Store as raw storage to avoid forced default construction
alignas(alignof(EventType)) std::byte event_storage[sizeof(EventType)];
alignas(64) std::atomic<uint64_t> sequence{0};
};
export template <typename EventType, std::size_t Capacity>
class NanosecondDispatcher {
private:
static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be a power of two.");
static constexpr std::size_t Mask = Capacity - 1;
alignas(64) EventSlot<EventType> ring_buffer_[Capacity];
alignas(64) std::atomic<uint64_t> producer_sequence_{0};
alignas(64) std::atomic<uint64_t> consumer_sequence_{0};
static inline void cpu_relax() noexcept {
#if defined(__x86_64__) || defined(_M_X64)
__builtin_ia32_pause();
#elif defined(__aarch64__)
asm volatile("yield" ::: "memory");
#endif
}
public:
NanosecondDispatcher() {
for (std::size_t i = 0; i < Capacity; ++i) {
ring_buffer_[i].sequence.store(i, std::memory_order_relaxed);
}
}
~NanosecondDispatcher() {
// Safe explicit destruction of any remaining unconsumed elements
uint64_t head = consumer_sequence_.load(std::memory_order_relaxed);
uint64_t tail = producer_sequence_.load(std::memory_order_relaxed);
for (uint64_t i = head; i < tail; ++i) {
auto* slot = &ring_buffer_[i & Mask];
std::destroy_at(reinterpret_cast<EventType*>(slot->event_storage));
}
}
NanosecondDispatcher(const NanosecondDispatcher&) = delete;
NanosecondDispatcher& operator=(const NanosecondDispatcher&) = delete;
template <typename... Args>
void publish(Args&&... args) noexcept {
uint64_t ticket = producer_sequence_.load(std::memory_order_relaxed);
EventSlot<EventType>* slot = nullptr;
while (true) {
slot = &ring_buffer_[ticket & Mask];
uint64_t seq = slot->sequence.load(std::memory_order_acquire);
int64_t diff = static_cast<int64_t>(seq) - static_cast<int64_t>(ticket);
if (diff == 0) {
if (producer_sequence_.compare_exchange_weak(ticket, ticket + 1, std::memory_order_relaxed, std::memory_order_relaxed)) {
break;
}
} else {
cpu_relax();
ticket = producer_sequence_.load(std::memory_order_relaxed);
}
}
::new (static_cast<void*>(slot->event_storage)) EventType(std::forward<Args>(args)...);
slot->sequence.store(ticket + 1, std::memory_order_release);
}
template <typename EventHandler>
void consume_next(EventHandler&& handler) noexcept {
uint64_t ticket = consumer_sequence_.load(std::memory_order_relaxed);
EventSlot<EventType>* slot = nullptr;
while (true) {
slot = &ring_buffer_[ticket & Mask];
uint64_t seq = slot->sequence.load(std::memory_order_acquire);
int64_t diff = static_cast<int64_t>(seq) - static_cast<int64_t>(ticket + 1);
if (diff == 0) {
if (consumer_sequence_.compare_exchange_weak(ticket, ticket + 1, std::memory_order_relaxed, std::memory_order_relaxed)) {
break;
}
} else {
cpu_relax();
ticket = consumer_sequence_.load(std::memory_order_relaxed);
}
}
auto* item = reinterpret_cast<EventType*>(slot->event_storage);
std::forward<EventHandler>(handler)(*item);
std::destroy_at(item);
slot->sequence.store(ticket + Capacity, std::memory_order_release);
}
};