TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2026 Michael Vandeberg
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/cppalliance/capy
9 : //
10 :
11 : #ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
12 : #define BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
13 :
14 : #include <boost/capy/continuation.hpp>
15 : #include <boost/capy/detail/config.hpp>
16 : #include <boost/capy/ex/frame_allocator.hpp>
17 :
18 : namespace boost {
19 : namespace capy {
20 : namespace detail {
21 :
22 : /** Single-threaded intrusive FIFO of pending continuations.
23 :
24 : Links continuations directly through `continuation::next`, so
25 : push() carries no per-item allocation.
26 :
27 : @par Thread Safety
28 : Not thread-safe. Caller must externally synchronize push() and
29 : take_all(). dispatch_batch() does not touch queue state and may
30 : run unlocked once the batch has been taken.
31 : */
32 : class strand_queue
33 : {
34 : continuation* head_ = nullptr;
35 : continuation* tail_ = nullptr;
36 :
37 : public:
38 HIT 11442 : strand_queue() = default;
39 : strand_queue(strand_queue const&) = delete;
40 : strand_queue& operator=(strand_queue const&) = delete;
41 :
42 : /** Returns true if there are no pending continuations. */
43 : bool
44 20284 : empty() const noexcept
45 : {
46 20284 : return head_ == nullptr;
47 : }
48 :
49 : /** Push a continuation to the queue.
50 :
51 : @param c The continuation to enqueue; see `continuation`
52 : for lifetime and aliasing requirements.
53 : */
54 : void
55 30340 : push(continuation& c) noexcept
56 : {
57 30340 : c.next = nullptr;
58 30340 : if(tail_)
59 10056 : tail_->next = &c;
60 : else
61 20284 : head_ = &c;
62 30340 : tail_ = &c;
63 30340 : }
64 :
65 : /** Batch of taken items for thread-safe dispatch. */
66 : struct taken_batch
67 : {
68 : continuation* head = nullptr;
69 : continuation* tail = nullptr;
70 : };
71 :
72 : /** Take all pending items atomically.
73 :
74 : Removes all items from the queue and returns them as a
75 : batch. The queue is left empty.
76 :
77 : @return The batch of taken items.
78 : */
79 : taken_batch
80 20284 : take_all() noexcept
81 : {
82 20284 : taken_batch batch{head_, tail_};
83 20284 : head_ = tail_ = nullptr;
84 20284 : return batch;
85 : }
86 :
87 : /** Resume each continuation in a taken batch.
88 :
89 : Advances past each node before resuming, since the
90 : resumed coroutine may destroy the awaitable (and thus
91 : the continuation) before control returns here.
92 :
93 : @param batch The batch to dispatch.
94 :
95 : @note Thread-safe with respect to push() because the queue
96 : itself is not touched.
97 : */
98 : static
99 : void
100 20284 : dispatch_batch(taken_batch& batch)
101 : {
102 50624 : while(batch.head)
103 : {
104 30340 : continuation* c = batch.head;
105 30340 : batch.head = c->next;
106 30340 : safe_resume(c->h);
107 : }
108 20284 : batch.tail = nullptr;
109 20284 : }
110 : };
111 :
112 : } // namespace detail
113 : } // namespace capy
114 : } // namespace boost
115 :
116 : #endif
|