CeresEngine 0.2.0
A game development framework
Loading...
Searching...
No Matches
Channel.hpp
Go to the documentation of this file.
1//
2// CeresEngine - A game development framework
3//
4// Created by Rogiel Sulzbach.
5// Copyright (c) 2018-2022 Rogiel Sulzbach. All rights reserved.
6//
7
8#pragma once
9
12
15
16#include <continuable/continuable.hpp>
17
18#include <cassert>
19#include <mutex>
20#include <optional>
21
22namespace CeresEngine {
23
42 template<typename T, typename Lockable = NullLockable> class Channel final {
43 // Channels can only work with value types.
44 static_assert(std::is_reference_v<T> == false, "Using reference for channel is forbidden.");
45
47 using CoroutineHandle = typename std::coroutine_handle<void>;
48
49 public:
51 using Pointer = T*;
52
54 using Reference = T&;
55
57 using Mutex = Lockable;
58
60 class Reader;
61
63 class Writer;
64
65 private:
68
69 template<typename NodeType> class LinkedList;
70
73
76
77 public:
79 Channel() = default;
80
82 Channel(const Channel&) = delete;
83
85 Channel(Channel&&) = delete;
86
88 Channel& operator=(const Channel&) = delete;
89
92
95
96 public:
100 [[nodiscard]] Writer write(T value) noexcept { return Writer{*this, std::forward<T>(value)}; }
101
104 [[nodiscard]] Reader read() noexcept { return Reader{*this}; }
105
106 private:
109 [[nodiscard]] static constexpr void* poison() noexcept { return nullptr; }
110
113 template<typename NodeType> class LinkedList final {
115 NodeType* head = nullptr;
116
118 NodeType* tail = nullptr;
119
120 public:
123
124 public:
127 [[nodiscard]] bool empty() const noexcept { return head == nullptr; }
128
131 void push(NodeType* node) noexcept {
132 if(tail) {
133 tail->next = node;
134 tail = node;
135 } else {
136 head = tail = node;
137 }
138 }
139
143 NodeType* node = head;
144 if(head == tail) { // empty or 1
145 head = tail = nullptr;
146 } else { // 2 or more
147 head = head->next;
148 }
149
150 return node; // this can be nullptr
151 }
152 };
153
154 public:
156 class Reader final {
158 friend Channel;
159
160 private:
162 mutable Optional<T> value = std::nullopt;
163
166
167 union {
169 Reader* next = nullptr;
170
173 };
174
175 private:
178 explicit Reader(Channel& ch) noexcept : channel(std::addressof(ch)) {}
179
180 public:
182 Reader(const Reader&) = delete;
183
185 Reader& operator=(const Reader&) = delete;
186
189 Reader(Reader&& rhs) noexcept {
190 std::swap(value, rhs.value);
191 std::swap(handle, rhs.handle);
192 std::swap(channel, rhs.channel);
193 }
194
198 Reader& operator=(Reader&& rhs) noexcept {
199 std::swap(value, rhs.value);
200 std::swap(handle, rhs.handle);
201 std::swap(channel, rhs.channel);
202 return *this;
203 }
204
207
208 public:
209 bool await_ready() const&;
212 };
213
217 friend Channel;
218
219 private:
221 mutable Optional<T> value = std::nullopt;
222
225
226 union {
228 Writer* next = nullptr;
229
232 };
233
234 private:
239 explicit Writer(Channel& channel, T&& value) noexcept : value(std::forward<T>(value)), channel(std::addressof(channel)) {}
240
241 public:
243 Writer(const Writer&) = delete;
244
246 Writer& operator=(const Writer&) = delete;
247
250 Writer(Writer&& rhs) noexcept {
251 std::swap(value, rhs.value);
252 std::swap(handle, rhs.handle);
253 std::swap(channel, rhs.channel);
254 }
255
259 Writer& operator=(Writer&& rhs) noexcept {
260 std::swap(value, rhs.value);
261 std::swap(handle, rhs.handle);
262 std::swap(channel, rhs.channel);
263 return *this;
264 }
265
268
269 public:
270 bool await_ready() const&;
272 bool await_resume() &;
273 };
274 };
275
277 // Because of thread scheduling,
278 // Some coroutines can be enqueued into list just after
279 // this destructor unlocks.
280 //
281 // But this can't be detected at once since
282 // we have 2 list in the channel...
283 //
284 // Current implementation triggers scheduling repeatedly
285 // to reduce the possibility. As repeat count becomes greater,
286 // the possibility drops to zero. But notice that it is NOT zero.
287 //
288 size_t repeat = 1; // recommend 5'000+ repeat for hazard usage
289 do {
291
292 while(!writers.empty()) {
293 Writer* w = writers.pop();
294 auto rh = w->handle;
295 w->handle = std::nullopt;
296
297 rh->resume();
298 }
299 while(!readers.empty()) {
300 Reader* r = readers.pop();
301 auto rh = r->handle;
302 r->handle = std::nullopt;
303
304 rh->resume();
305 }
306 } while(repeat--);
307 }
308
309 template<typename T, typename Lockable> bool Channel<T, Lockable>::Reader::await_ready() const& {
311 if(channel->writers.empty())
312 CE_UNLIKELY { return false; }
313
314 Writer* w = channel->writers.pop();
315 // assert(w != nullptr);
316 // assert(w->ptr != nullptr);
317 // assert(w->handle != nullptr);
318
319 // exchange address & resumeable_handle
320 std::swap(this->value, w->value);
321 std::swap(this->handle, w->handle);
322
323 return true;
324 }
325
326 template<typename T, typename Lockable> void Channel<T, Lockable>::Reader::await_suspend(CoroutineHandle coro) & {
327 // notice that next & chan are sharing memory
328 Channel& ch = *(this->channel);
329
330 this->handle = coro; // remember handle before push/unlock
331 this->next = nullptr; // clear to prevent confusing
332
333 ch.readers.push(this); // push to channel
334 // ch.mutex.unlock();
335 }
336
337 template<typename T, typename Lockable> Optional<T> Channel<T, Lockable>::Reader::await_resume() & {
338 // frame holds poision if the channel is going to destroy
339 if(!handle)
340 CE_UNLIKELY { return std::nullopt; }
341
342 // Store first. we have to do this
343 // because the resume operation can destroy the writer coroutine
344 T value = std::move(this->value.get());
345 if(const auto coroutine = *handle) {
346 // assert(this->handle != nullptr);
347 // assert(*reinterpret_cast<UInt64*>(handle) != 0);
348 coroutine.resume();
349 }
350
351 return std::move(value);
352 }
353
354 template<typename T, typename Lockable> bool Channel<T, Lockable>::Writer::await_ready() const& {
356 if(channel->readers.empty())
357 CE_UNLIKELY { return false; }
358
359 Reader* r = channel->readers.pop();
360 // exchange address & resumeable_handle
361 std::swap(this->value, r->value);
362 std::swap(this->handle, r->handle);
363
364 return true;
365 }
366
367 template<typename T, typename Lockable> void Channel<T, Lockable>::Writer::await_suspend(CoroutineHandle coro) & {
368 // notice that next & chan are sharing memory
369 Channel& ch = *(this->channel);
370
371 this->handle = coro; // remember handle before push/unlock
372 this->next = nullptr; // clear to prevent confusing
373
374 ch.writers.push(this); // push to channel
375 // ch.mutex.unlock();
376 }
377
378 template<typename T, typename Lockable> bool Channel<T, Lockable>::Writer::await_resume() & {
379 // frame holds poision if the channel is going to destroy
380 if(!handle)
381 CE_UNLIKELY { return false; }
382
383 if(const auto coroutine = *handle) {
384 // assert(this->handle != nullptr);
385 // assert(*reinterpret_cast<UInt64*>(handle) != 0);
386 coroutine.resume();
387 }
388 return true;
389 }
390
391} // namespace CeresEngine
#define CE_UNLIKELY
Definition Macros.hpp:397
Minimal linked list without node allocation.
Definition Channel.hpp:113
NodeType * head
The head node.
Definition Channel.hpp:115
void push(NodeType *node) noexcept
Pushes a new node into the list.
Definition Channel.hpp:131
LinkedList() noexcept=default
Creates a new LinkedList.
NodeType * tail
The tail node.
Definition Channel.hpp:118
NodeType * pop() noexcept
Pops a node from the list.
Definition Channel.hpp:142
bool empty() const noexcept
Checks if the list is empty.
Definition Channel.hpp:127
Awaitable reader for Channel
Definition Channel.hpp:156
Channel * channel
The channel to push this reader.
Definition Channel.hpp:172
~Reader() noexcept=default
Destroys the reader.
Optional< T > value
The address of value.
Definition Channel.hpp:162
Reader & operator=(Reader &&rhs) noexcept
Assigns the reader by moving the contents of another.
Definition Channel.hpp:198
Reader & operator=(const Reader &)=delete
Deleted copy assignment operator.
Reader(Reader &&rhs) noexcept
Creates a new reader by moving the contents of another.
Definition Channel.hpp:189
bool await_ready() const &
Definition Channel.hpp:309
Reader(const Reader &)=delete
Deleted copy constructor.
void await_suspend(CoroutineHandle coro) &
Definition Channel.hpp:326
friend Channel
Definition Channel.hpp:158
Optional< CoroutineHandle > handle
The coroutine handle.
Definition Channel.hpp:165
Reader * next
Next reader in the queue.
Definition Channel.hpp:169
Reader(Channel &ch) noexcept
Creates a new channel reader.
Definition Channel.hpp:178
Optional< T > await_resume() &
Definition Channel.hpp:337
Awaitable writer for channel
Definition Channel.hpp:215
void await_suspend(CoroutineHandle coro) &
Definition Channel.hpp:367
bool await_resume() &
Definition Channel.hpp:378
Writer(Channel &channel, T &&value) noexcept
Creates a new writer for the given channel with the given pv value.
Definition Channel.hpp:239
Writer(const Writer &)=delete
Deleted copy constructor.
friend Channel
Definition Channel.hpp:217
bool await_ready() const &
Definition Channel.hpp:354
Writer(Writer &&rhs) noexcept
Creates a new writer by moving the contents of another.
Definition Channel.hpp:250
Optional< CoroutineHandle > handle
The coroutine handle.
Definition Channel.hpp:224
Writer & operator=(Writer &&rhs) noexcept
Assigns the writer by moving the contents of another.
Definition Channel.hpp:259
Writer & operator=(const Writer &)=delete
Deleted copy assignment operator.
~Writer() noexcept=default
Destroys the writer.
Channel * channel
The channel to push this reader.
Definition Channel.hpp:231
The coroutine channel is a helper utility class that allows to implement easy object passing between ...
Definition Channel.hpp:42
Lockable Mutex
The mutex type used to lock the channel.
Definition Channel.hpp:57
Channel(Channel &&)=delete
Deleted copy assignment operator.
Channel & operator=(const Channel &)=delete
Deleted move constructor.
Writer write(T value) noexcept
Writes a value to the channel.
Definition Channel.hpp:100
T & Reference
A reference of T
Definition Channel.hpp:54
LinkedList< Reader > readers
A LinkedList of Readers.
Definition Channel.hpp:72
Mutex mutex
The mutex that synchronizes access to the channel.
Definition Channel.hpp:67
Channel & operator=(Channel &&)=delete
Deleted move assignment operator.
~Channel() noexcept
Destroys the channel.
Definition Channel.hpp:276
typename std::coroutine_handle< void > CoroutineHandle
A type-alias to the coroutine handle used by channels.
Definition Channel.hpp:47
Reader read() noexcept
Reads a value from the channel.
Definition Channel.hpp:104
Channel(const Channel &)=delete
Deleted copy constructor.
static constexpr void * poison() noexcept
Definition Channel.hpp:109
Channel()=default
Create a new channel.
LinkedList< Writer > writers
A LinkedList of Writers.
Definition Channel.hpp:75
T * Pointer
A pointer to T
Definition Channel.hpp:51
The class Lock is a mutex wrapper that provides a convenient RAII-style mechanism for owning one or m...
Definition Threading.hpp:246
Definition Optional.hpp:17
Definition Application.hpp:19
decltype(auto) lock(Func &&func, Ts &... objects)
Definition Threading.hpp:667
constexpr size_t hash(const T &v)
Generates a hash for the provided type.
Definition Hash.hpp:25