100.00% Lines (21/21) 100.00% Functions (5/5)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
  3 + // Copyright (c) 2026 Michael Vandeberg
3   // 4   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 5   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 7   //
7   // Official repository: https://github.com/cppalliance/capy 8   // Official repository: https://github.com/cppalliance/capy
8   // 9   //
9   10  
10   #ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP 11   #ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
11   #define BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP 12   #define BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
12   13  
  14 + #include <boost/capy/continuation.hpp>
13   #include <boost/capy/detail/config.hpp> 15   #include <boost/capy/detail/config.hpp>
14   #include <boost/capy/ex/frame_allocator.hpp> 16   #include <boost/capy/ex/frame_allocator.hpp>
15 - #include <coroutine>  
16 - #include <cstddef>  
17 - #include <exception>  
18 -  
19   17  
20   namespace boost { 18   namespace boost {
21   namespace capy { 19   namespace capy {
22   namespace detail { 20   namespace detail {
23   21  
24 - class strand_queue; 22 + /** Single-threaded intrusive FIFO of pending continuations.
25 -  
26 - //----------------------------------------------------------  
27 -  
28 - // Metadata stored before the coroutine frame  
29 - struct frame_prefix  
30 - {  
31 - frame_prefix* next;  
32 - strand_queue* queue;  
33 - std::size_t alloc_size;  
34 - };  
35 -  
36 - //----------------------------------------------------------  
37 -  
38 - /** Wrapper coroutine for strand queue dispatch operations.  
39 -  
40 - This coroutine wraps a target coroutine handle and resumes  
41 - it when dispatched. The wrapper ensures control returns to  
42 - the dispatch loop after the target suspends or completes.  
43 -  
44 - The promise contains an intrusive list node for queue  
45 - storage and supports a custom allocator that recycles  
46 - coroutine frames via a free list.  
47 - */  
48 - struct strand_op  
49 - {  
50 - struct promise_type  
51 - {  
52 - promise_type* next = nullptr;  
53 -  
54 - void*  
55 - operator new(  
56 - std::size_t size,  
57 - strand_queue& q,  
58 - std::coroutine_handle<void>);  
59 -  
60 - void  
61 - operator delete(void* p, std::size_t);  
62 -  
63 - strand_op  
DCB 64 - 30340 get_return_object() noexcept  
65 - {  
DCB 66 - 30340 return {std::coroutine_handle<promise_type>::from_promise(*this)};  
67 - }  
68 -  
69 - std::suspend_always  
DCB 70 - 30340 initial_suspend() noexcept  
71 - {  
DCB 72 - 30340 return {};  
73 - }  
74 -  
75 - std::suspend_always  
DCB 76 - 30340 final_suspend() noexcept  
77 - {  
DCB 78 - 30340 return {};  
79 - }  
80 -  
81 - void  
DCB 82 - 30340 return_void() noexcept  
83 - {  
DCB 84 - 30340 }  
85 -  
86 - void  
87 - unhandled_exception()  
88 - {  
89 - std::terminate();  
90 - }  
91 - };  
92 -  
93 - std::coroutine_handle<promise_type> h_;  
94 - };  
95 -  
96 - //----------------------------------------------------------  
97 -  
98 - /** Single-threaded dispatch queue for coroutine handles.  
99 -  
100 - This queue stores coroutine handles and resumes them  
101 - sequentially when dispatch() is called. Each pushed  
102 - handle is wrapped in a strand_op coroutine that ensures  
103 - control returns to the dispatch loop after the target  
104 - suspends or completes.  
105   23  
106 - The queue uses an intrusive singly-linked list through 24 + Links continuations directly through `continuation::next`, so
107 - the promise type to avoid separate node allocations. 25 + push() carries no per-item allocation.
108 - A free list recycles wrapper coroutine frames to reduce  
109 - allocation overhead during repeated push/dispatch cycles.  
110   26  
111   @par Thread Safety 27   @par Thread Safety
112 - This class is not thread-safe. All operations must be 28 + Not thread-safe. Caller must externally synchronize push() and
113 - called from a single thread. 29 + take_all(). dispatch_batch() does not touch queue state and may
  30 + run unlocked once the batch has been taken.
114   */ 31   */
115   class strand_queue 32   class strand_queue
116   { 33   {
117 - using promise_type = strand_op::promise_type; 34 + continuation* head_ = nullptr;
118 - 35 + continuation* tail_ = nullptr;
119 - promise_type* head_ = nullptr;  
120 - promise_type* tail_ = nullptr;  
121 - frame_prefix* free_list_ = nullptr;  
122 -  
123 - friend struct strand_op::promise_type;  
124 -  
125 - static  
126 - strand_op  
DCB 127 - 30340 make_strand_op(  
128 - strand_queue& q,  
129 - std::coroutine_handle<void> target)  
130 - {  
131 - (void)q;  
132 - safe_resume(target);  
133 - co_return;  
DCB 134 - 60680 }  
135   36  
136   public: 37   public:
DCB 137 - 11442  
HITGIC 138   strand_queue() = default; 38   11442 strand_queue() = default;
139   strand_queue(strand_queue const&) = delete; 39   strand_queue(strand_queue const&) = delete;
140   strand_queue& operator=(strand_queue const&) = delete; 40   strand_queue& operator=(strand_queue const&) = delete;
141   41  
142 - /** Destructor. 42 + /** Returns true if there are no pending continuations. */
143 -  
144 - Destroys any pending wrappers without resuming them,  
145 - then frees all memory in the free list.  
146 - */  
DCB 147 - 11442 ~strand_queue()  
148 - {  
149 - // Destroy pending wrappers  
DCB 150 - 11442 while(head_)  
151 - {  
DUB 152 - promise_type* p = head_;  
DUB 153 - head_ = p->next;  
154 -  
DUB 155 - auto h = std::coroutine_handle<promise_type>::from_promise(*p);  
DUB 156 - h.destroy();  
157 - }  
158 -  
159 - // Free the free list memory  
DCB 160 - 11442 while(free_list_)  
161 - {  
DUB 162 - frame_prefix* prefix = free_list_;  
DUB 163 - free_list_ = prefix->next;  
DUB 164 - ::operator delete(prefix);  
165 - }  
DCB 166 - 11442 }  
167 -  
168 - /** Returns true if there are no pending operations.  
169 - */  
170   bool 43   bool
HITCBC 171   20319 empty() const noexcept 44   20284 empty() const noexcept
172   { 45   {
HITCBC 173   20319 return head_ == nullptr; 46   20284 return head_ == nullptr;
174   } 47   }
175   48  
176 - /** Push a coroutine handle to the queue. 49 + /** Push a continuation to the queue.
177 -  
178 - Creates a wrapper coroutine and appends it to the  
179 - queue. The wrapper will resume the target handle  
180 - when dispatch() processes it.  
181   50  
182 - @param h The coroutine handle to dispatch. 51 + @param c The continuation to enqueue; see `continuation`
  52 + for lifetime and aliasing requirements.
183   */ 53   */
184   void 54   void
HITCBC 185 - 30340 push(std::coroutine_handle<void> h) 55 + 30340 push(continuation& c) noexcept
186   { 56   {
HITCBC 187 - 30340 strand_op op = make_strand_op(*this, h); 57 + 30340 c.next = nullptr;
188 -  
DCB 189 - 30340 promise_type* p = &op.h_.promise();  
DCB 190 - 30340 p->next = nullptr;  
191 -  
HITCBC 192   30340 if(tail_) 58   30340 if(tail_)
HITCBC 193 - 10021 tail_->next = p; 59 + 10056 tail_->next = &c;
194   else 60   else
HITCBC 195 - 20319 head_ = p; 61 + 20284 head_ = &c;
HITCBC 196 - 30340 tail_ = p; 62 + 30340 tail_ = &c;
DCB 197 - 30340 }  
198 -  
199 - /** Resume all queued coroutines in sequence.  
200 -  
201 - Processes each wrapper in FIFO order, resuming its  
202 - target coroutine. After each target suspends or  
203 - completes, the wrapper is destroyed and its frame  
204 - is added to the free list for reuse.  
205 -  
206 - Coroutines resumed during dispatch may push new  
207 - handles, which will also be processed in the same  
208 - dispatch call.  
209 -  
210 - @warning Not thread-safe. Do not call while another  
211 - thread may be calling push().  
212 - */  
213 - void  
214 - dispatch()  
215 - {  
216 - while(head_)  
217 - {  
218 - promise_type* p = head_;  
219 - head_ = p->next;  
220 - if(!head_)  
221 - tail_ = nullptr;  
222 -  
223 - auto h = std::coroutine_handle<promise_type>::from_promise(*p);  
224 - safe_resume(h);  
225 - h.destroy();  
226 - }  
HITGIC 227   } 63   30340 }
228   64  
229   /** Batch of taken items for thread-safe dispatch. */ 65   /** Batch of taken items for thread-safe dispatch. */
230   struct taken_batch 66   struct taken_batch
231   { 67   {
232 - promise_type* head = nullptr; 68 + continuation* head = nullptr;
233 - promise_type* tail = nullptr; 69 + continuation* tail = nullptr;
234   }; 70   };
235   71  
236   /** Take all pending items atomically. 72   /** Take all pending items atomically.
237   73  
238 - Removes all items from the queue and returns them 74 + Removes all items from the queue and returns them as a
239 - as a batch. The queue is left empty. 75 + batch. The queue is left empty.
240   76  
241   @return The batch of taken items. 77   @return The batch of taken items.
242   */ 78   */
243   taken_batch 79   taken_batch
HITCBC 244   20319 take_all() noexcept 80   20284 take_all() noexcept
245   { 81   {
HITCBC 246   20319 taken_batch batch{head_, tail_}; 82   20284 taken_batch batch{head_, tail_};
HITCBC 247   20319 head_ = tail_ = nullptr; 83   20284 head_ = tail_ = nullptr;
HITCBC 248   20319 return batch; 84   20284 return batch;
249   } 85   }
250   86  
251 - /** Dispatch a batch of taken items. 87 + /** Resume each continuation in a taken batch.
  88 +
  89 + Advances past each node before resuming, since the
  90 + resumed coroutine may destroy the awaitable (and thus
  91 + the continuation) before control returns here.
252   92  
253   @param batch The batch to dispatch. 93   @param batch The batch to dispatch.
254   94  
255 - @note This is thread-safe w.r.t. push() because it doesn't 95 + @note Thread-safe with respect to push() because the queue
256 - access the queue's free_list_. Frames are deleted directly 96 + itself is not touched.
257 - rather than recycled.  
258   */ 97   */
259   static 98   static
260   void 99   void
HITCBC 261   20319 dispatch_batch(taken_batch& batch) 100   20284 dispatch_batch(taken_batch& batch)
262   { 101   {
HITCBC 263   50659 while(batch.head) 102   50624 while(batch.head)
264   { 103   {
HITCBC 265 - 30340 promise_type* p = batch.head; 104 + 30340 continuation* c = batch.head;
HITCBC 266 - 30340 batch.head = p->next; 105 + 30340 batch.head = c->next;
HITGIC 267 - 106 + 30340 safe_resume(c->h);
DCB 268 - 30340 auto h = std::coroutine_handle<promise_type>::from_promise(*p);  
DCB 269 - 30340 safe_resume(h);  
270 - // Don't use h.destroy() - it would call operator delete which  
271 - // accesses the queue's free_list_ (race with push).  
272 - // Instead, manually free the frame without recycling.  
273 - // h.address() returns the frame base (what operator new returned).  
DCB 274 - 30340 frame_prefix* prefix = static_cast<frame_prefix*>(h.address()) - 1;  
DCB 275 - 30340 ::operator delete(prefix);  
276   } 107   }
HITCBC 277   20319 batch.tail = nullptr; 108   20284 batch.tail = nullptr;
HITCBC 278   20319 } 109   20284 }
279 -  
280 - //----------------------------------------------------------  
281 -  
282 - inline  
283 - void*  
284 - strand_op::promise_type::operator new(  
DCB 285 - 30340 std::size_t size,  
286 - strand_queue& q,  
287 - std::coroutine_handle<void>)  
288 - {  
289 - // Total size includes prefix  
290 - std::size_t alloc_size = size + sizeof(frame_prefix);  
DCB 291 - 30340 void* raw;  
292 -  
293 - // Try to reuse from free list  
294 - if(q.free_list_)  
DCB 295 - 30340 {  
296 - frame_prefix* prefix = q.free_list_;  
DUB 297 - q.free_list_ = prefix->next;  
DUB 298 - raw = prefix;  
DUB 299 - }  
300 - else  
301 - {  
302 - raw = ::operator new(alloc_size);  
DCB 303 - 30340 }  
304 -  
305 - // Initialize prefix  
306 - frame_prefix* prefix = static_cast<frame_prefix*>(raw);  
DCB 307 - 30340 prefix->next = nullptr;  
DCB 308 - 30340 prefix->queue = &q;  
DCB 309 - 30340 prefix->alloc_size = alloc_size;  
DCB 310 - 30340  
311 - // Return pointer AFTER the prefix (this is where coroutine frame goes)  
312 - return prefix + 1;  
DCB 313 - 30340 }  
314 -  
315 - inline  
316 - void  
317 - strand_op::promise_type::operator delete(void* p, std::size_t)  
DUB 318 - {  
319 - // Calculate back to get the prefix  
320 - frame_prefix* prefix = static_cast<frame_prefix*>(p) - 1;  
DUB 321 -  
322 - // Add to free list  
323 - prefix->next = prefix->queue->free_list_;  
DUB 324 - prefix->queue->free_list_ = prefix;  
DUB 325 - }  
EUB 326   }; 110   };
327   111  
328   } // namespace detail 112   } // namespace detail
329   } // namespace capy 113   } // namespace capy
330   } // namespace boost 114   } // namespace boost
331   115  
332   #endif 116   #endif