src/ex/detail/strand_queue.hpp
74.2% Lines (49/66)
85.7% Functions (12/14)
src/ex/detail/strand_queue.hpp
| Line | Hits | Source Code |
|---|---|---|
| 1 | // | |
| 2 | // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) | |
| 3 | // | |
| 4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | |
| 5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
| 6 | // | |
| 7 | // Official repository: https://github.com/cppalliance/capy | |
| 8 | // | |
| 9 | ||
| 10 | #ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP | |
| 11 | #define BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP | |
| 12 | ||
| 13 | #include <boost/capy/detail/config.hpp> | |
| 14 | ||
| 15 | #include <coroutine> | |
| 16 | #include <cstddef> | |
| 17 | #include <exception> | |
| 18 | ||
| 19 | namespace boost { | |
| 20 | namespace capy { | |
| 21 | namespace detail { | |
| 22 | ||
| 23 | class strand_queue; | |
| 24 | ||
| 25 | //---------------------------------------------------------- | |
| 26 | ||
| 27 | // Metadata stored before the coroutine frame | |
| 28 | struct frame_prefix | |
| 29 | { | |
| 30 | frame_prefix* next; | |
| 31 | strand_queue* queue; | |
| 32 | std::size_t alloc_size; | |
| 33 | }; | |
| 34 | ||
| 35 | //---------------------------------------------------------- | |
| 36 | ||
| 37 | /** Wrapper coroutine for strand queue dispatch operations. | |
| 38 | ||
| 39 | This coroutine wraps a target coroutine handle and resumes | |
| 40 | it when dispatched. The wrapper ensures control returns to | |
| 41 | the dispatch loop after the target suspends or completes. | |
| 42 | ||
| 43 | The promise contains an intrusive list node for queue | |
| 44 | storage and supports a custom allocator that recycles | |
| 45 | coroutine frames via a free list. | |
| 46 | */ | |
| 47 | struct strand_op | |
| 48 | { | |
| 49 | struct promise_type | |
| 50 | { | |
| 51 | promise_type* next = nullptr; | |
| 52 | ||
| 53 | void* | |
| 54 | operator new( | |
| 55 | std::size_t size, | |
| 56 | strand_queue& q, | |
| 57 | std::coroutine_handle<void>); | |
| 58 | ||
| 59 | void | |
| 60 | operator delete(void* p, std::size_t); | |
| 61 | ||
| 62 | strand_op | |
| 63 | 322 | get_return_object() noexcept |
| 64 | { | |
| 65 | 322 | return {std::coroutine_handle<promise_type>::from_promise(*this)}; |
| 66 | } | |
| 67 | ||
| 68 | std::suspend_always | |
| 69 | 322 | initial_suspend() noexcept |
| 70 | { | |
| 71 | 322 | return {}; |
| 72 | } | |
| 73 | ||
| 74 | std::suspend_always | |
| 75 | 322 | final_suspend() noexcept |
| 76 | { | |
| 77 | 322 | return {}; |
| 78 | } | |
| 79 | ||
| 80 | void | |
| 81 | 322 | return_void() noexcept |
| 82 | { | |
| 83 | 322 | } |
| 84 | ||
| 85 | void | |
| 86 | ✗ | unhandled_exception() |
| 87 | { | |
| 88 | ✗ | std::terminate(); |
| 89 | } | |
| 90 | }; | |
| 91 | ||
| 92 | std::coroutine_handle<promise_type> h_; | |
| 93 | }; | |
| 94 | ||
| 95 | //---------------------------------------------------------- | |
| 96 | ||
| 97 | /** Single-threaded dispatch queue for coroutine handles. | |
| 98 | ||
| 99 | This queue stores coroutine handles and resumes them | |
| 100 | sequentially when dispatch() is called. Each pushed | |
| 101 | handle is wrapped in a strand_op coroutine that ensures | |
| 102 | control returns to the dispatch loop after the target | |
| 103 | suspends or completes. | |
| 104 | ||
| 105 | The queue uses an intrusive singly-linked list through | |
| 106 | the promise type to avoid separate node allocations. | |
| 107 | A free list recycles wrapper coroutine frames to reduce | |
| 108 | allocation overhead during repeated push/dispatch cycles. | |
| 109 | ||
| 110 | @par Thread Safety | |
| 111 | This class is not thread-safe. All operations must be | |
| 112 | called from a single thread. | |
| 113 | */ | |
| 114 | class strand_queue | |
| 115 | { | |
| 116 | using promise_type = strand_op::promise_type; | |
| 117 | ||
| 118 | promise_type* head_ = nullptr; | |
| 119 | promise_type* tail_ = nullptr; | |
| 120 | frame_prefix* free_list_ = nullptr; | |
| 121 | ||
| 122 | friend struct strand_op::promise_type; | |
| 123 | ||
| 124 | static | |
| 125 | strand_op | |
| 126 | 322 | make_strand_op( |
| 127 | strand_queue& q, | |
| 128 | std::coroutine_handle<void> target) | |
| 129 | { | |
| 130 | (void)q; | |
| 131 | target.resume(); | |
| 132 | co_return; | |
| 133 | 644 | } |
| 134 | ||
| 135 | public: | |
| 136 | 4009 | strand_queue() = default; |
| 137 | ||
| 138 | strand_queue(strand_queue const&) = delete; | |
| 139 | strand_queue& operator=(strand_queue const&) = delete; | |
| 140 | ||
| 141 | /** Destructor. | |
| 142 | ||
| 143 | Destroys any pending wrappers without resuming them, | |
| 144 | then frees all memory in the free list. | |
| 145 | */ | |
| 146 | 4009 | ~strand_queue() |
| 147 | { | |
| 148 | // Destroy pending wrappers | |
| 149 | 4009 | while(head_) |
| 150 | { | |
| 151 | ✗ | promise_type* p = head_; |
| 152 | ✗ | head_ = p->next; |
| 153 | ||
| 154 | ✗ | auto h = std::coroutine_handle<promise_type>::from_promise(*p); |
| 155 | ✗ | h.destroy(); |
| 156 | } | |
| 157 | ||
| 158 | // Free the free list memory | |
| 159 | 4009 | while(free_list_) |
| 160 | { | |
| 161 | ✗ | frame_prefix* prefix = free_list_; |
| 162 | ✗ | free_list_ = prefix->next; |
| 163 | ✗ | ::operator delete(prefix); |
| 164 | } | |
| 165 | 4009 | } |
| 166 | ||
| 167 | /** Returns true if there are no pending operations. | |
| 168 | */ | |
| 169 | bool | |
| 170 | 13 | empty() const noexcept |
| 171 | { | |
| 172 | 13 | return head_ == nullptr; |
| 173 | } | |
| 174 | ||
| 175 | /** Push a coroutine handle to the queue. | |
| 176 | ||
| 177 | Creates a wrapper coroutine and appends it to the | |
| 178 | queue. The wrapper will resume the target handle | |
| 179 | when dispatch() processes it. | |
| 180 | ||
| 181 | @param h The coroutine handle to dispatch. | |
| 182 | */ | |
| 183 | void | |
| 184 | 322 | push(std::coroutine_handle<void> h) |
| 185 | { | |
| 186 | 322 | strand_op op = make_strand_op(*this, h); |
| 187 | ||
| 188 | 322 | promise_type* p = &op.h_.promise(); |
| 189 | 322 | p->next = nullptr; |
| 190 | ||
| 191 | 322 | if(tail_) |
| 192 | 309 | tail_->next = p; |
| 193 | else | |
| 194 | 13 | head_ = p; |
| 195 | 322 | tail_ = p; |
| 196 | 322 | } |
| 197 | ||
| 198 | /** Resume all queued coroutines in sequence. | |
| 199 | ||
| 200 | Processes each wrapper in FIFO order, resuming its | |
| 201 | target coroutine. After each target suspends or | |
| 202 | completes, the wrapper is destroyed and its frame | |
| 203 | is added to the free list for reuse. | |
| 204 | ||
| 205 | Coroutines resumed during dispatch may push new | |
| 206 | handles, which will also be processed in the same | |
| 207 | dispatch call. | |
| 208 | ||
| 209 | @warning Not thread-safe. Do not call while another | |
| 210 | thread may be calling push(). | |
| 211 | */ | |
| 212 | void | |
| 213 | dispatch() | |
| 214 | { | |
| 215 | while(head_) | |
| 216 | { | |
| 217 | promise_type* p = head_; | |
| 218 | head_ = p->next; | |
| 219 | if(!head_) | |
| 220 | tail_ = nullptr; | |
| 221 | ||
| 222 | auto h = std::coroutine_handle<promise_type>::from_promise(*p); | |
| 223 | h.resume(); | |
| 224 | h.destroy(); | |
| 225 | } | |
| 226 | } | |
| 227 | ||
| 228 | /** Batch of taken items for thread-safe dispatch. */ | |
| 229 | struct taken_batch | |
| 230 | { | |
| 231 | promise_type* head = nullptr; | |
| 232 | promise_type* tail = nullptr; | |
| 233 | }; | |
| 234 | ||
| 235 | /** Take all pending items atomically. | |
| 236 | ||
| 237 | Removes all items from the queue and returns them | |
| 238 | as a batch. The queue is left empty. | |
| 239 | ||
| 240 | @return The batch of taken items. | |
| 241 | */ | |
| 242 | taken_batch | |
| 243 | 13 | take_all() noexcept |
| 244 | { | |
| 245 | 13 | taken_batch batch{head_, tail_}; |
| 246 | 13 | head_ = tail_ = nullptr; |
| 247 | 13 | return batch; |
| 248 | } | |
| 249 | ||
| 250 | /** Dispatch a batch of taken items. | |
| 251 | ||
| 252 | @param batch The batch to dispatch. | |
| 253 | ||
| 254 | @note This is thread-safe w.r.t. push() because it doesn't | |
| 255 | access the queue's free_list_. Frames are deleted directly | |
| 256 | rather than recycled. | |
| 257 | */ | |
| 258 | static | |
| 259 | void | |
| 260 | 13 | dispatch_batch(taken_batch& batch) |
| 261 | { | |
| 262 | 335 | while(batch.head) |
| 263 | { | |
| 264 | 322 | promise_type* p = batch.head; |
| 265 | 322 | batch.head = p->next; |
| 266 | ||
| 267 | 322 | auto h = std::coroutine_handle<promise_type>::from_promise(*p); |
| 268 | 322 | h.resume(); |
| 269 | // Don't use h.destroy() - it would call operator delete which | |
| 270 | // accesses the queue's free_list_ (race with push). | |
| 271 | // Instead, manually free the frame without recycling. | |
| 272 | // h.address() returns the frame base (what operator new returned). | |
| 273 | 322 | frame_prefix* prefix = static_cast<frame_prefix*>(h.address()) - 1; |
| 274 | 322 | ::operator delete(prefix); |
| 275 | } | |
| 276 | 13 | batch.tail = nullptr; |
| 277 | 13 | } |
| 278 | }; | |
| 279 | ||
| 280 | //---------------------------------------------------------- | |
| 281 | ||
| 282 | inline | |
| 283 | void* | |
| 284 | 322 | strand_op::promise_type::operator new( |
| 285 | std::size_t size, | |
| 286 | strand_queue& q, | |
| 287 | std::coroutine_handle<void>) | |
| 288 | { | |
| 289 | // Total size includes prefix | |
| 290 | 322 | std::size_t alloc_size = size + sizeof(frame_prefix); |
| 291 | void* raw; | |
| 292 | ||
| 293 | // Try to reuse from free list | |
| 294 | 322 | if(q.free_list_) |
| 295 | { | |
| 296 | ✗ | frame_prefix* prefix = q.free_list_; |
| 297 | ✗ | q.free_list_ = prefix->next; |
| 298 | ✗ | raw = prefix; |
| 299 | } | |
| 300 | else | |
| 301 | { | |
| 302 | 322 | raw = ::operator new(alloc_size); |
| 303 | } | |
| 304 | ||
| 305 | // Initialize prefix | |
| 306 | 322 | frame_prefix* prefix = static_cast<frame_prefix*>(raw); |
| 307 | 322 | prefix->next = nullptr; |
| 308 | 322 | prefix->queue = &q; |
| 309 | 322 | prefix->alloc_size = alloc_size; |
| 310 | ||
| 311 | // Return pointer AFTER the prefix (this is where coroutine frame goes) | |
| 312 | 322 | return prefix + 1; |
| 313 | } | |
| 314 | ||
| 315 | inline | |
| 316 | void | |
| 317 | ✗ | strand_op::promise_type::operator delete(void* p, std::size_t) |
| 318 | { | |
| 319 | // Calculate back to get the prefix | |
| 320 | ✗ | frame_prefix* prefix = static_cast<frame_prefix*>(p) - 1; |
| 321 | ||
| 322 | // Add to free list | |
| 323 | ✗ | prefix->next = prefix->queue->free_list_; |
| 324 | ✗ | prefix->queue->free_list_ = prefix; |
| 325 | ✗ | } |
| 326 | ||
| 327 | } // namespace detail | |
| 328 | } // namespace capy | |
| 329 | } // namespace boost | |
| 330 | ||
| 331 | #endif | |
| 332 |