100.00% Lines (21/21)
100.00% Functions (5/5)
| TLA | Baseline | Branch | ||||||
|---|---|---|---|---|---|---|---|---|
| Line | Hits | Code | Line | Hits | Code | |||
| 1 | // | 1 | // | |||||
| 2 | // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) | 2 | // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) | |||||
| 3 | + | // Copyright (c) 2026 Michael Vandeberg | ||||||
| 3 | // | 4 | // | |||||
| 4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | 5 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | |||||
| 5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | 6 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |||||
| 6 | // | 7 | // | |||||
| 7 | // Official repository: https://github.com/cppalliance/capy | 8 | // Official repository: https://github.com/cppalliance/capy | |||||
| 8 | // | 9 | // | |||||
| 9 | 10 | |||||||
| 10 | #ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP | 11 | #ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP | |||||
| 11 | #define BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP | 12 | #define BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP | |||||
| 12 | 13 | |||||||
| 14 | + | #include <boost/capy/continuation.hpp> | ||||||
| 13 | #include <boost/capy/detail/config.hpp> | 15 | #include <boost/capy/detail/config.hpp> | |||||
| 14 | #include <boost/capy/ex/frame_allocator.hpp> | 16 | #include <boost/capy/ex/frame_allocator.hpp> | |||||
| 15 | - | #include <coroutine> | ||||||
| 16 | - | #include <cstddef> | ||||||
| 17 | - | #include <exception> | ||||||
| 18 | - | |||||||
| 19 | 17 | |||||||
| 20 | namespace boost { | 18 | namespace boost { | |||||
| 21 | namespace capy { | 19 | namespace capy { | |||||
| 22 | namespace detail { | 20 | namespace detail { | |||||
| 23 | 21 | |||||||
| 24 | - | class strand_queue; | 22 | + | /** Single-threaded intrusive FIFO of pending continuations. | |||
| 25 | - | |||||||
| 26 | - | //---------------------------------------------------------- | ||||||
| 27 | - | |||||||
| 28 | - | // Metadata stored before the coroutine frame | ||||||
| 29 | - | struct frame_prefix | ||||||
| 30 | - | { | ||||||
| 31 | - | frame_prefix* next; | ||||||
| 32 | - | strand_queue* queue; | ||||||
| 33 | - | std::size_t alloc_size; | ||||||
| 34 | - | }; | ||||||
| 35 | - | |||||||
| 36 | - | //---------------------------------------------------------- | ||||||
| 37 | - | |||||||
| 38 | - | /** Wrapper coroutine for strand queue dispatch operations. | ||||||
| 39 | - | |||||||
| 40 | - | This coroutine wraps a target coroutine handle and resumes | ||||||
| 41 | - | it when dispatched. The wrapper ensures control returns to | ||||||
| 42 | - | the dispatch loop after the target suspends or completes. | ||||||
| 43 | - | |||||||
| 44 | - | The promise contains an intrusive list node for queue | ||||||
| 45 | - | storage and supports a custom allocator that recycles | ||||||
| 46 | - | coroutine frames via a free list. | ||||||
| 47 | - | */ | ||||||
| 48 | - | struct strand_op | ||||||
| 49 | - | { | ||||||
| 50 | - | struct promise_type | ||||||
| 51 | - | { | ||||||
| 52 | - | promise_type* next = nullptr; | ||||||
| 53 | - | |||||||
| 54 | - | void* | ||||||
| 55 | - | operator new( | ||||||
| 56 | - | std::size_t size, | ||||||
| 57 | - | strand_queue& q, | ||||||
| 58 | - | std::coroutine_handle<void>); | ||||||
| 59 | - | |||||||
| 60 | - | void | ||||||
| 61 | - | operator delete(void* p, std::size_t); | ||||||
| 62 | - | |||||||
| 63 | - | strand_op | ||||||
| DCB | 64 | - | 30340 | get_return_object() noexcept | ||||
| 65 | - | { | ||||||
| DCB | 66 | - | 30340 | return {std::coroutine_handle<promise_type>::from_promise(*this)}; | ||||
| 67 | - | } | ||||||
| 68 | - | |||||||
| 69 | - | std::suspend_always | ||||||
| DCB | 70 | - | 30340 | initial_suspend() noexcept | ||||
| 71 | - | { | ||||||
| DCB | 72 | - | 30340 | return {}; | ||||
| 73 | - | } | ||||||
| 74 | - | |||||||
| 75 | - | std::suspend_always | ||||||
| DCB | 76 | - | 30340 | final_suspend() noexcept | ||||
| 77 | - | { | ||||||
| DCB | 78 | - | 30340 | return {}; | ||||
| 79 | - | } | ||||||
| 80 | - | |||||||
| 81 | - | void | ||||||
| DCB | 82 | - | 30340 | return_void() noexcept | ||||
| 83 | - | { | ||||||
| DCB | 84 | - | 30340 | } | ||||
| 85 | - | |||||||
| 86 | - | void | ||||||
| 87 | - | unhandled_exception() | ||||||
| 88 | - | { | ||||||
| 89 | - | std::terminate(); | ||||||
| 90 | - | } | ||||||
| 91 | - | }; | ||||||
| 92 | - | |||||||
| 93 | - | std::coroutine_handle<promise_type> h_; | ||||||
| 94 | - | }; | ||||||
| 95 | - | |||||||
| 96 | - | //---------------------------------------------------------- | ||||||
| 97 | - | |||||||
| 98 | - | /** Single-threaded dispatch queue for coroutine handles. | ||||||
| 99 | - | |||||||
| 100 | - | This queue stores coroutine handles and resumes them | ||||||
| 101 | - | sequentially when dispatch() is called. Each pushed | ||||||
| 102 | - | handle is wrapped in a strand_op coroutine that ensures | ||||||
| 103 | - | control returns to the dispatch loop after the target | ||||||
| 104 | - | suspends or completes. | ||||||
| 105 | 23 | |||||||
| 106 | - | The queue uses an intrusive singly-linked list through | 24 | + | Links continuations directly through `continuation::next`, so | |||
| 107 | - | the promise type to avoid separate node allocations. | 25 | + | push() carries no per-item allocation. | |||
| 108 | - | A free list recycles wrapper coroutine frames to reduce | ||||||
| 109 | - | allocation overhead during repeated push/dispatch cycles. | ||||||
| 110 | 26 | |||||||
| 111 | @par Thread Safety | 27 | @par Thread Safety | |||||
| 112 | - | This class is not thread-safe. All operations must be | 28 | + | Not thread-safe. Caller must externally synchronize push() and | |||
| 113 | - | called from a single thread. | 29 | + | take_all(). dispatch_batch() does not touch queue state and may | |||
| 30 | + | run unlocked once the batch has been taken. | ||||||
| 114 | */ | 31 | */ | |||||
| 115 | class strand_queue | 32 | class strand_queue | |||||
| 116 | { | 33 | { | |||||
| 117 | - | using promise_type = strand_op::promise_type; | 34 | + | continuation* head_ = nullptr; | |||
| 118 | - | 35 | + | continuation* tail_ = nullptr; | ||||
| 119 | - | promise_type* head_ = nullptr; | ||||||
| 120 | - | promise_type* tail_ = nullptr; | ||||||
| 121 | - | frame_prefix* free_list_ = nullptr; | ||||||
| 122 | - | |||||||
| 123 | - | friend struct strand_op::promise_type; | ||||||
| 124 | - | |||||||
| 125 | - | static | ||||||
| 126 | - | strand_op | ||||||
| DCB | 127 | - | 30340 | make_strand_op( | ||||
| 128 | - | strand_queue& q, | ||||||
| 129 | - | std::coroutine_handle<void> target) | ||||||
| 130 | - | { | ||||||
| 131 | - | (void)q; | ||||||
| 132 | - | safe_resume(target); | ||||||
| 133 | - | co_return; | ||||||
| DCB | 134 | - | 60680 | } | ||||
| 135 | 36 | |||||||
| 136 | public: | 37 | public: | |||||
| DCB | 137 | - | 11442 | |||||
| HITGIC | 138 | strand_queue() = default; | 38 | 11442 | strand_queue() = default; | |||
| 139 | strand_queue(strand_queue const&) = delete; | 39 | strand_queue(strand_queue const&) = delete; | |||||
| 140 | strand_queue& operator=(strand_queue const&) = delete; | 40 | strand_queue& operator=(strand_queue const&) = delete; | |||||
| 141 | 41 | |||||||
| 142 | - | /** Destructor. | 42 | + | /** Returns true if there are no pending continuations. */ | |||
| 143 | - | |||||||
| 144 | - | Destroys any pending wrappers without resuming them, | ||||||
| 145 | - | then frees all memory in the free list. | ||||||
| 146 | - | */ | ||||||
| DCB | 147 | - | 11442 | ~strand_queue() | ||||
| 148 | - | { | ||||||
| 149 | - | // Destroy pending wrappers | ||||||
| DCB | 150 | - | 11442 | while(head_) | ||||
| 151 | - | { | ||||||
| DUB | 152 | - | ✗ | promise_type* p = head_; | ||||
| DUB | 153 | - | ✗ | head_ = p->next; | ||||
| 154 | - | |||||||
| DUB | 155 | - | ✗ | auto h = std::coroutine_handle<promise_type>::from_promise(*p); | ||||
| DUB | 156 | - | ✗ | h.destroy(); | ||||
| 157 | - | } | ||||||
| 158 | - | |||||||
| 159 | - | // Free the free list memory | ||||||
| DCB | 160 | - | 11442 | while(free_list_) | ||||
| 161 | - | { | ||||||
| DUB | 162 | - | ✗ | frame_prefix* prefix = free_list_; | ||||
| DUB | 163 | - | ✗ | free_list_ = prefix->next; | ||||
| DUB | 164 | - | ✗ | ::operator delete(prefix); | ||||
| 165 | - | } | ||||||
| DCB | 166 | - | 11442 | } | ||||
| 167 | - | |||||||
| 168 | - | /** Returns true if there are no pending operations. | ||||||
| 169 | - | */ | ||||||
| 170 | bool | 43 | bool | |||||
| HITCBC | 171 | 20319 | empty() const noexcept | 44 | 20284 | empty() const noexcept | ||
| 172 | { | 45 | { | |||||
| HITCBC | 173 | 20319 | return head_ == nullptr; | 46 | 20284 | return head_ == nullptr; | ||
| 174 | } | 47 | } | |||||
| 175 | 48 | |||||||
| 176 | - | /** Push a coroutine handle to the queue. | 49 | + | /** Push a continuation to the queue. | |||
| 177 | - | |||||||
| 178 | - | Creates a wrapper coroutine and appends it to the | ||||||
| 179 | - | queue. The wrapper will resume the target handle | ||||||
| 180 | - | when dispatch() processes it. | ||||||
| 181 | 50 | |||||||
| 182 | - | @param h The coroutine handle to dispatch. | 51 | + | @param c The continuation to enqueue; see `continuation` | |||
| 52 | + | for lifetime and aliasing requirements. | ||||||
| 183 | */ | 53 | */ | |||||
| 184 | void | 54 | void | |||||
| HITCBC | 185 | - | 30340 | push(std::coroutine_handle<void> h) | 55 | + | 30340 | push(continuation& c) noexcept |
| 186 | { | 56 | { | |||||
| HITCBC | 187 | - | 30340 | strand_op op = make_strand_op(*this, h); | 57 | + | 30340 | c.next = nullptr; |
| 188 | - | |||||||
| DCB | 189 | - | 30340 | promise_type* p = &op.h_.promise(); | ||||
| DCB | 190 | - | 30340 | p->next = nullptr; | ||||
| 191 | - | |||||||
| HITCBC | 192 | 30340 | if(tail_) | 58 | 30340 | if(tail_) | ||
| HITCBC | 193 | - | 10021 | tail_->next = p; | 59 | + | 10056 | tail_->next = &c; |
| 194 | else | 60 | else | |||||
| HITCBC | 195 | - | 20319 | head_ = p; | 61 | + | 20284 | head_ = &c; |
| HITCBC | 196 | - | 30340 | tail_ = p; | 62 | + | 30340 | tail_ = &c; |
| DCB | 197 | - | 30340 | } | ||||
| 198 | - | |||||||
| 199 | - | /** Resume all queued coroutines in sequence. | ||||||
| 200 | - | |||||||
| 201 | - | Processes each wrapper in FIFO order, resuming its | ||||||
| 202 | - | target coroutine. After each target suspends or | ||||||
| 203 | - | completes, the wrapper is destroyed and its frame | ||||||
| 204 | - | is added to the free list for reuse. | ||||||
| 205 | - | |||||||
| 206 | - | Coroutines resumed during dispatch may push new | ||||||
| 207 | - | handles, which will also be processed in the same | ||||||
| 208 | - | dispatch call. | ||||||
| 209 | - | |||||||
| 210 | - | @warning Not thread-safe. Do not call while another | ||||||
| 211 | - | thread may be calling push(). | ||||||
| 212 | - | */ | ||||||
| 213 | - | void | ||||||
| 214 | - | dispatch() | ||||||
| 215 | - | { | ||||||
| 216 | - | while(head_) | ||||||
| 217 | - | { | ||||||
| 218 | - | promise_type* p = head_; | ||||||
| 219 | - | head_ = p->next; | ||||||
| 220 | - | if(!head_) | ||||||
| 221 | - | tail_ = nullptr; | ||||||
| 222 | - | |||||||
| 223 | - | auto h = std::coroutine_handle<promise_type>::from_promise(*p); | ||||||
| 224 | - | safe_resume(h); | ||||||
| 225 | - | h.destroy(); | ||||||
| 226 | - | } | ||||||
| HITGIC | 227 | } | 63 | 30340 | } | |||
| 228 | 64 | |||||||
| 229 | /** Batch of taken items for thread-safe dispatch. */ | 65 | /** Batch of taken items for thread-safe dispatch. */ | |||||
| 230 | struct taken_batch | 66 | struct taken_batch | |||||
| 231 | { | 67 | { | |||||
| 232 | - | promise_type* head = nullptr; | 68 | + | continuation* head = nullptr; | |||
| 233 | - | promise_type* tail = nullptr; | 69 | + | continuation* tail = nullptr; | |||
| 234 | }; | 70 | }; | |||||
| 235 | 71 | |||||||
| 236 | /** Take all pending items atomically. | 72 | /** Take all pending items atomically. | |||||
| 237 | 73 | |||||||
| 238 | - | Removes all items from the queue and returns them | 74 | + | Removes all items from the queue and returns them as a | |||
| 239 | - | as a batch. The queue is left empty. | 75 | + | batch. The queue is left empty. | |||
| 240 | 76 | |||||||
| 241 | @return The batch of taken items. | 77 | @return The batch of taken items. | |||||
| 242 | */ | 78 | */ | |||||
| 243 | taken_batch | 79 | taken_batch | |||||
| HITCBC | 244 | 20319 | take_all() noexcept | 80 | 20284 | take_all() noexcept | ||
| 245 | { | 81 | { | |||||
| HITCBC | 246 | 20319 | taken_batch batch{head_, tail_}; | 82 | 20284 | taken_batch batch{head_, tail_}; | ||
| HITCBC | 247 | 20319 | head_ = tail_ = nullptr; | 83 | 20284 | head_ = tail_ = nullptr; | ||
| HITCBC | 248 | 20319 | return batch; | 84 | 20284 | return batch; | ||
| 249 | } | 85 | } | |||||
| 250 | 86 | |||||||
| 251 | - | /** Dispatch a batch of taken items. | 87 | + | /** Resume each continuation in a taken batch. | |||
| 88 | + | |||||||
| 89 | + | Advances past each node before resuming, since the | ||||||
| 90 | + | resumed coroutine may destroy the awaitable (and thus | ||||||
| 91 | + | the continuation) before control returns here. | ||||||
| 252 | 92 | |||||||
| 253 | @param batch The batch to dispatch. | 93 | @param batch The batch to dispatch. | |||||
| 254 | 94 | |||||||
| 255 | - | @note This is thread-safe w.r.t. push() because it doesn't | 95 | + | @note Thread-safe with respect to push() because the queue | |||
| 256 | - | access the queue's free_list_. Frames are deleted directly | 96 | + | itself is not touched. | |||
| 257 | - | rather than recycled. | ||||||
| 258 | */ | 97 | */ | |||||
| 259 | static | 98 | static | |||||
| 260 | void | 99 | void | |||||
| HITCBC | 261 | 20319 | dispatch_batch(taken_batch& batch) | 100 | 20284 | dispatch_batch(taken_batch& batch) | ||
| 262 | { | 101 | { | |||||
| HITCBC | 263 | 50659 | while(batch.head) | 102 | 50624 | while(batch.head) | ||
| 264 | { | 103 | { | |||||
| HITCBC | 265 | - | 30340 | promise_type* p = batch.head; | 104 | + | 30340 | continuation* c = batch.head; |
| HITCBC | 266 | - | 30340 | batch.head = p->next; | 105 | + | 30340 | batch.head = c->next; |
| HITGIC | 267 | - | 106 | + | 30340 | safe_resume(c->h); | ||
| DCB | 268 | - | 30340 | auto h = std::coroutine_handle<promise_type>::from_promise(*p); | ||||
| DCB | 269 | - | 30340 | safe_resume(h); | ||||
| 270 | - | // Don't use h.destroy() - it would call operator delete which | ||||||
| 271 | - | // accesses the queue's free_list_ (race with push). | ||||||
| 272 | - | // Instead, manually free the frame without recycling. | ||||||
| 273 | - | // h.address() returns the frame base (what operator new returned). | ||||||
| DCB | 274 | - | 30340 | frame_prefix* prefix = static_cast<frame_prefix*>(h.address()) - 1; | ||||
| DCB | 275 | - | 30340 | ::operator delete(prefix); | ||||
| 276 | } | 107 | } | |||||
| HITCBC | 277 | 20319 | batch.tail = nullptr; | 108 | 20284 | batch.tail = nullptr; | ||
| HITCBC | 278 | 20319 | } | 109 | 20284 | } | ||
| 279 | - | |||||||
| 280 | - | //---------------------------------------------------------- | ||||||
| 281 | - | |||||||
| 282 | - | inline | ||||||
| 283 | - | void* | ||||||
| 284 | - | strand_op::promise_type::operator new( | ||||||
| DCB | 285 | - | 30340 | std::size_t size, | ||||
| 286 | - | strand_queue& q, | ||||||
| 287 | - | std::coroutine_handle<void>) | ||||||
| 288 | - | { | ||||||
| 289 | - | // Total size includes prefix | ||||||
| 290 | - | std::size_t alloc_size = size + sizeof(frame_prefix); | ||||||
| DCB | 291 | - | 30340 | void* raw; | ||||
| 292 | - | |||||||
| 293 | - | // Try to reuse from free list | ||||||
| 294 | - | if(q.free_list_) | ||||||
| DCB | 295 | - | 30340 | { | ||||
| 296 | - | frame_prefix* prefix = q.free_list_; | ||||||
| DUB | 297 | - | ✗ | q.free_list_ = prefix->next; | ||||
| DUB | 298 | - | ✗ | raw = prefix; | ||||
| DUB | 299 | - | ✗ | } | ||||
| 300 | - | else | ||||||
| 301 | - | { | ||||||
| 302 | - | raw = ::operator new(alloc_size); | ||||||
| DCB | 303 | - | 30340 | } | ||||
| 304 | - | |||||||
| 305 | - | // Initialize prefix | ||||||
| 306 | - | frame_prefix* prefix = static_cast<frame_prefix*>(raw); | ||||||
| DCB | 307 | - | 30340 | prefix->next = nullptr; | ||||
| DCB | 308 | - | 30340 | prefix->queue = &q; | ||||
| DCB | 309 | - | 30340 | prefix->alloc_size = alloc_size; | ||||
| DCB | 310 | - | 30340 | |||||
| 311 | - | // Return pointer AFTER the prefix (this is where coroutine frame goes) | ||||||
| 312 | - | return prefix + 1; | ||||||
| DCB | 313 | - | 30340 | } | ||||
| 314 | - | |||||||
| 315 | - | inline | ||||||
| 316 | - | void | ||||||
| 317 | - | strand_op::promise_type::operator delete(void* p, std::size_t) | ||||||
| DUB | 318 | - | ✗ | { | ||||
| 319 | - | // Calculate back to get the prefix | ||||||
| 320 | - | frame_prefix* prefix = static_cast<frame_prefix*>(p) - 1; | ||||||
| DUB | 321 | - | ✗ | |||||
| 322 | - | // Add to free list | ||||||
| 323 | - | prefix->next = prefix->queue->free_list_; | ||||||
| DUB | 324 | - | ✗ | prefix->queue->free_list_ = prefix; | ||||
| DUB | 325 | - | ✗ | } | ||||
| EUB | 326 | ✗ | }; | 110 | }; | |||
| 327 | 111 | |||||||
| 328 | } // namespace detail | 112 | } // namespace detail | |||||
| 329 | } // namespace capy | 113 | } // namespace capy | |||||
| 330 | } // namespace boost | 114 | } // namespace boost | |||||
| 331 | 115 | |||||||
| 332 | #endif | 116 | #endif | |||||