95.61% Lines (109/114) 95.83% Functions (23/24)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3   // 3   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 4   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 6   //
7   // Official repository: https://github.com/cppalliance/capy 7   // Official repository: https://github.com/cppalliance/capy
8   // 8   //
9   9  
10   #include "src/ex/detail/strand_impl.hpp" 10   #include "src/ex/detail/strand_impl.hpp"
11   #include <boost/capy/ex/detail/strand_service.hpp> 11   #include <boost/capy/ex/detail/strand_service.hpp>
12   #include <boost/capy/continuation.hpp> 12   #include <boost/capy/continuation.hpp>
13   #include <coroutine> 13   #include <coroutine>
14   #include <memory> 14   #include <memory>
15   #include <utility> 15   #include <utility>
16   16  
17   namespace boost { 17   namespace boost {
18   namespace capy { 18   namespace capy {
19   namespace detail { 19   namespace detail {
20   20  
21   // Sentinel stored in invoker_frame_cache_ after shutdown to prevent 21   // Sentinel stored in invoker_frame_cache_ after shutdown to prevent
22   // in-flight invokers from repopulating a freed cache slot. 22   // in-flight invokers from repopulating a freed cache slot.
23   inline void* const kCacheClosed = reinterpret_cast<void*>(1); 23   inline void* const kCacheClosed = reinterpret_cast<void*>(1);
24   24  
25   /** Concrete strand_service. 25   /** Concrete strand_service.
26   26  
27   Holds a shared mutex pool (193 entries), a linked list of live 27   Holds a shared mutex pool (193 entries), a linked list of live
28   impls (for shutdown traversal), and a single-slot invoker 28   impls (for shutdown traversal), and a single-slot invoker
29   coroutine frame cache shared across all strands of this service. 29   coroutine frame cache shared across all strands of this service.
30   30  
31   The dispatch helpers (`enqueue`, `dispatch_pending`, etc.) are 31   The dispatch helpers (`enqueue`, `dispatch_pending`, etc.) are
32   public so the namespace-scope `make_invoker` coroutine and the 32   public so the namespace-scope `make_invoker` coroutine and the
33   `strand_service` static methods can call them without friendship. 33   `strand_service` static methods can call them without friendship.
34   */ 34   */
35   class strand_service_impl : public strand_service 35   class strand_service_impl : public strand_service
36   { 36   {
37   public: 37   public:
38   static constexpr std::size_t num_mutexes = 193; 38   static constexpr std::size_t num_mutexes = 193;
39   39  
40   std::mutex mutex_; 40   std::mutex mutex_;
41   std::size_t salt_ = 0; 41   std::size_t salt_ = 0;
42   std::shared_ptr<std::mutex> mutexes_[num_mutexes]; 42   std::shared_ptr<std::mutex> mutexes_[num_mutexes];
43   intrusive_list<strand_impl> impl_list_; 43   intrusive_list<strand_impl> impl_list_;
44   std::atomic<void*> invoker_frame_cache_{nullptr}; 44   std::atomic<void*> invoker_frame_cache_{nullptr};
45   45  
46   explicit 46   explicit
HITCBC 47   30 strand_service_impl(execution_context&) 47   30 strand_service_impl(execution_context&)
HITCBC 48   30 { 48   30 {
HITCBC 49   30 } 49   30 }
50   50  
51   std::shared_ptr<strand_impl> 51   std::shared_ptr<strand_impl>
HITCBC 52   11442 create_implementation() override 52   11442 create_implementation() override
53   { 53   {
HITCBC 54   11442 auto new_impl = std::make_shared<strand_impl>(); 54   11442 auto new_impl = std::make_shared<strand_impl>();
55   55  
HITCBC 56   11442 std::lock_guard<std::mutex> lock(mutex_); 56   11442 std::lock_guard<std::mutex> lock(mutex_);
57   57  
HITCBC 58   11442 std::size_t s = salt_++; 58   11442 std::size_t s = salt_++;
HITCBC 59   11442 std::size_t idx = reinterpret_cast<std::size_t>(new_impl.get()); 59   11442 std::size_t idx = reinterpret_cast<std::size_t>(new_impl.get());
HITCBC 60   11442 idx += idx >> 3; 60   11442 idx += idx >> 3;
HITCBC 61   11442 idx ^= s + 0x9e3779b9 + (idx << 6) + (idx >> 2); 61   11442 idx ^= s + 0x9e3779b9 + (idx << 6) + (idx >> 2);
HITCBC 62   11442 idx %= num_mutexes; 62   11442 idx %= num_mutexes;
HITCBC 63   11442 if(!mutexes_[idx]) 63   11442 if(!mutexes_[idx])
HITCBC 64   674 mutexes_[idx] = std::make_shared<std::mutex>(); 64   665 mutexes_[idx] = std::make_shared<std::mutex>();
HITCBC 65   11442 new_impl->mutex_ = mutexes_[idx].get(); 65   11442 new_impl->mutex_ = mutexes_[idx].get();
66   66  
HITCBC 67   11442 impl_list_.push_back(new_impl.get()); 67   11442 impl_list_.push_back(new_impl.get());
HITCBC 68   11442 new_impl->service_.store(this, std::memory_order_release); 68   11442 new_impl->service_.store(this, std::memory_order_release);
69   69  
HITCBC 70   22884 return new_impl; 70   22884 return new_impl;
HITCBC 71   11442 } 71   11442 }
72   72  
73   static bool 73   static bool
HITCBC 74 - 30340 enqueue(strand_impl& impl, std::coroutine_handle<> h) 74 + 30340 enqueue(strand_impl& impl, continuation& c)
75   { 75   {
HITCBC 76   30340 std::lock_guard<std::mutex> lock(*impl.mutex_); 76   30340 std::lock_guard<std::mutex> lock(*impl.mutex_);
HITCBC 77 - 30340 impl.pending_.push(h); 77 + 30340 impl.pending_.push(c);
HITCBC 78   30340 if(!impl.locked_) 78   30340 if(!impl.locked_)
79   { 79   {
HITCBC 80   19316 impl.locked_ = true; 80   19800 impl.locked_ = true;
HITCBC 81   19316 return true; 81   19800 return true;
82   } 82   }
HITCBC 83   11024 return false; 83   10540 return false;
HITCBC 84   30340 } 84   30340 }
85   85  
86   static void 86   static void
HITCBC 87   20319 dispatch_pending(strand_impl& impl) 87   20284 dispatch_pending(strand_impl& impl)
88   { 88   {
HITCBC 89   20319 strand_queue::taken_batch batch; 89   20284 strand_queue::taken_batch batch;
90   { 90   {
HITCBC 91   20319 std::lock_guard<std::mutex> lock(*impl.mutex_); 91   20284 std::lock_guard<std::mutex> lock(*impl.mutex_);
HITCBC 92   20319 batch = impl.pending_.take_all(); 92   20284 batch = impl.pending_.take_all();
HITCBC 93   20319 } 93   20284 }
HITCBC 94   20319 impl.pending_.dispatch_batch(batch); 94   20284 impl.pending_.dispatch_batch(batch);
HITCBC 95   20319 } 95   20284 }
96   96  
97   static bool 97   static bool
HITCBC 98   20319 try_unlock(strand_impl& impl) 98   20284 try_unlock(strand_impl& impl)
99   { 99   {
HITCBC 100   20319 std::lock_guard<std::mutex> lock(*impl.mutex_); 100   20284 std::lock_guard<std::mutex> lock(*impl.mutex_);
HITCBC 101   20319 if(impl.pending_.empty()) 101   20284 if(impl.pending_.empty())
102   { 102   {
HITCBC 103   19316 impl.locked_ = false; 103   19800 impl.locked_ = false;
HITCBC 104   19316 return true; 104   19800 return true;
105   } 105   }
HITCBC 106   1003 return false; 106   484 return false;
HITCBC 107   20319 } 107   20284 }
108   108  
109   static void 109   static void
HITCBC 110   20319 set_dispatch_thread(strand_impl& impl) noexcept 110   20284 set_dispatch_thread(strand_impl& impl) noexcept
111   { 111   {
HITCBC 112   20319 impl.dispatch_thread_.store(std::this_thread::get_id()); 112   20284 impl.dispatch_thread_.store(std::this_thread::get_id());
HITCBC 113   20319 } 113   20284 }
114   114  
115   static void 115   static void
HITCBC 116   19316 clear_dispatch_thread(strand_impl& impl) noexcept 116   19800 clear_dispatch_thread(strand_impl& impl) noexcept
117   { 117   {
HITCBC 118   19316 impl.dispatch_thread_.store(std::thread::id{}); 118   19800 impl.dispatch_thread_.store(std::thread::id{});
HITCBC 119   19316 } 119   19800 }
120   120  
121   // Defined below; needs strand_invoker complete. 121   // Defined below; needs strand_invoker complete.
122   static void 122   static void
123   post_invoker(std::shared_ptr<strand_impl> impl, executor_ref ex); 123   post_invoker(std::shared_ptr<strand_impl> impl, executor_ref ex);
124   124  
125   protected: 125   protected:
126   void 126   void
HITCBC 127   30 shutdown() override 127   30 shutdown() override
128   { 128   {
HITCBC 129   30 std::lock_guard<std::mutex> lock(mutex_); 129   30 std::lock_guard<std::mutex> lock(mutex_);
HITCBC 130   30 while(auto* p = impl_list_.pop_front()) 130   30 while(auto* p = impl_list_.pop_front())
131   { 131   {
MISUBC 132   std::lock_guard<std::mutex> impl_lock(*p->mutex_); 132   std::lock_guard<std::mutex> impl_lock(*p->mutex_);
MISUBC 133   p->locked_ = true; 133   p->locked_ = true;
MISUBC 134   p->service_.store(nullptr, std::memory_order_release); 134   p->service_.store(nullptr, std::memory_order_release);
MISUBC 135   } 135   }
136   136  
HITCBC 137   30 void* fp = invoker_frame_cache_.exchange( 137   30 void* fp = invoker_frame_cache_.exchange(
138   kCacheClosed, std::memory_order_acq_rel); 138   kCacheClosed, std::memory_order_acq_rel);
HITCBC 139   30 if(fp) ::operator delete(fp); 139   30 if(fp) ::operator delete(fp);
HITCBC 140   30 } 140   30 }
141   }; 141   };
142   142  
143   /** Invoker coroutine that drains a strand's pending queue. 143   /** Invoker coroutine that drains a strand's pending queue.
144   144  
145   Runs once the strand transitions from unlocked to locked. Holds 145   Runs once the strand transitions from unlocked to locked. Holds
146   the impl alive via the coroutine parameter (a shared_ptr in the 146   the impl alive via the coroutine parameter (a shared_ptr in the
147   coroutine frame), so user code may drop its strand handle while 147   coroutine frame), so user code may drop its strand handle while
148   the invoker is mid-flight. 148   the invoker is mid-flight.
149   149  
150   The frame's allocator recycles a single per-service slot. The 150   The frame's allocator recycles a single per-service slot. The
151   trailer points at the service (lifetime: execution_context), 151   trailer points at the service (lifetime: execution_context),
152   NOT the impl (lifetime: per-strand), so operator delete is 152   NOT the impl (lifetime: per-strand), so operator delete is
153   safe even after the impl has been destroyed. 153   safe even after the impl has been destroyed.
154   */ 154   */
155   struct strand_invoker 155   struct strand_invoker
156   { 156   {
157   struct promise_type 157   struct promise_type
158   { 158   {
159   // Stored in the coroutine frame so its address is stable for 159   // Stored in the coroutine frame so its address is stable for
160   // posting to the inner executor. 160   // posting to the inner executor.
161   continuation self_; 161   continuation self_;
162   162  
163   void* 163   void*
HITCBC 164   19316 operator new( 164   19800 operator new(
165   std::size_t n, 165   std::size_t n,
166   std::shared_ptr<strand_impl> const& impl) 166   std::shared_ptr<strand_impl> const& impl)
167   { 167   {
HITCBC 168   19316 auto* svc = impl->service_.load(std::memory_order_acquire); 168   19800 auto* svc = impl->service_.load(std::memory_order_acquire);
HITCBC 169   19316 constexpr auto A = alignof(strand_service_impl*); 169   19800 constexpr auto A = alignof(strand_service_impl*);
HITCBC 170   19316 std::size_t padded = (n + A - 1) & ~(A - 1); 170   19800 std::size_t padded = (n + A - 1) & ~(A - 1);
HITCBC 171   19316 std::size_t total = padded + sizeof(strand_service_impl*); 171   19800 std::size_t total = padded + sizeof(strand_service_impl*);
172   172  
HITCBC 173   19316 void* p = svc->invoker_frame_cache_.exchange( 173   19800 void* p = svc->invoker_frame_cache_.exchange(
174   nullptr, std::memory_order_acquire); 174   nullptr, std::memory_order_acquire);
HITCBC 175   19316 if(!p || p == kCacheClosed) 175   19800 if(!p || p == kCacheClosed)
HITCBC 176   8561 p = ::operator new(total); 176   8072 p = ::operator new(total);
177   177  
HITCBC 178   19316 *reinterpret_cast<strand_service_impl**>( 178   19800 *reinterpret_cast<strand_service_impl**>(
HITCBC 179   19316 static_cast<char*>(p) + padded) = svc; 179   19800 static_cast<char*>(p) + padded) = svc;
HITCBC 180   19316 return p; 180   19800 return p;
181   } 181   }
182   182  
183   void 183   void
HITCBC 184   19316 operator delete(void* p, std::size_t n) noexcept 184   19800 operator delete(void* p, std::size_t n) noexcept
185   { 185   {
HITCBC 186   19316 constexpr auto A = alignof(strand_service_impl*); 186   19800 constexpr auto A = alignof(strand_service_impl*);
HITCBC 187   19316 std::size_t padded = (n + A - 1) & ~(A - 1); 187   19800 std::size_t padded = (n + A - 1) & ~(A - 1);
HITCBC 188   19316 auto* svc = *reinterpret_cast<strand_service_impl**>( 188   19800 auto* svc = *reinterpret_cast<strand_service_impl**>(
189   static_cast<char*>(p) + padded); 189   static_cast<char*>(p) + padded);
190   190  
HITCBC 191   19316 void* expected = nullptr; 191   19800 void* expected = nullptr;
HITCBC 192   19316 if(!svc->invoker_frame_cache_.compare_exchange_strong( 192   19800 if(!svc->invoker_frame_cache_.compare_exchange_strong(
193   expected, p, std::memory_order_release)) 193   expected, p, std::memory_order_release))
HITCBC 194   8544 ::operator delete(p); 194   8055 ::operator delete(p);
HITCBC 195   19316 } 195   19800 }
196   196  
197   strand_invoker 197   strand_invoker
HITCBC 198   19316 get_return_object() noexcept 198   19800 get_return_object() noexcept
199   { 199   {
HITCBC 200   19316 return {std::coroutine_handle<promise_type>::from_promise(*this)}; 200   19800 return {std::coroutine_handle<promise_type>::from_promise(*this)};
201   } 201   }
202   202  
HITCBC 203   19316 std::suspend_always initial_suspend() noexcept { return {}; } 203   19800 std::suspend_always initial_suspend() noexcept { return {}; }
HITCBC 204   19316 std::suspend_never final_suspend() noexcept { return {}; } 204   19800 std::suspend_never final_suspend() noexcept { return {}; }
HITCBC 205   19316 void return_void() noexcept {} 205   19800 void return_void() noexcept {}
MISUBC 206   void unhandled_exception() { std::terminate(); } 206   void unhandled_exception() { std::terminate(); }
207   }; 207   };
208   208  
209   std::coroutine_handle<promise_type> h_; 209   std::coroutine_handle<promise_type> h_;
210   }; 210   };
211   211  
212   // The by-value parameter lives in the coroutine frame for the 212   // The by-value parameter lives in the coroutine frame for the
213   // invoker's lifetime, keeping the impl alive past any user-side 213   // invoker's lifetime, keeping the impl alive past any user-side
214   // strand drop. 214   // strand drop.
215   static 215   static
216   strand_invoker 216   strand_invoker
HITCBC 217   19316 make_invoker(std::shared_ptr<strand_impl> impl) 217   19800 make_invoker(std::shared_ptr<strand_impl> impl)
218   { 218   {
219   auto* p = impl.get(); 219   auto* p = impl.get();
220   for(;;) 220   for(;;)
221   { 221   {
222   strand_service_impl::set_dispatch_thread(*p); 222   strand_service_impl::set_dispatch_thread(*p);
223   strand_service_impl::dispatch_pending(*p); 223   strand_service_impl::dispatch_pending(*p);
224   if(strand_service_impl::try_unlock(*p)) 224   if(strand_service_impl::try_unlock(*p))
225   { 225   {
226   strand_service_impl::clear_dispatch_thread(*p); 226   strand_service_impl::clear_dispatch_thread(*p);
227   co_return; 227   co_return;
228   } 228   }
229   } 229   }
HITCBC 230   38632 } 230   39600 }
231   231  
232   void 232   void
HITCBC 233   19316 strand_service_impl::post_invoker( 233   19800 strand_service_impl::post_invoker(
234   std::shared_ptr<strand_impl> impl, 234   std::shared_ptr<strand_impl> impl,
235   executor_ref ex) 235   executor_ref ex)
236   { 236   {
HITCBC 237   19316 auto invoker = make_invoker(std::move(impl)); 237   19800 auto invoker = make_invoker(std::move(impl));
HITCBC 238   19316 auto& self = invoker.h_.promise().self_; 238   19800 auto& self = invoker.h_.promise().self_;
HITCBC 239   19316 self.h = invoker.h_; 239   19800 self.h = invoker.h_;
HITCBC 240   19316 ex.post(self); 240   19800 ex.post(self);
HITCBC 241   19316 } 241   19800 }
242   242  
HITCBC 243   22884 strand_impl::~strand_impl() 243   22884 strand_impl::~strand_impl()
244   { 244   {
HITCBC 245   11442 auto* svc = service_.load(std::memory_order_acquire); 245   11442 auto* svc = service_.load(std::memory_order_acquire);
HITCBC 246   11442 if(!svc) return; 246   11442 if(!svc) return;
HITCBC 247   11442 std::lock_guard<std::mutex> lock(svc->mutex_); 247   11442 std::lock_guard<std::mutex> lock(svc->mutex_);
HITCBC 248   11442 svc->impl_list_.remove(this); 248   11442 svc->impl_list_.remove(this);
HITCBC 249   11442 } 249   11442 }
250   250  
HITCBC 251   30 strand_service:: 251   30 strand_service::
HITCBC 252   30 strand_service() 252   30 strand_service()
HITCBC 253   30 : service() 253   30 : service()
254   { 254   {
HITCBC 255   30 } 255   30 }
256   256  
HITCBC 257   30 strand_service:: 257   30 strand_service::
258   ~strand_service() = default; 258   ~strand_service() = default;
259   259  
260   bool 260   bool
HITCBC 261   12 strand_service:: 261   12 strand_service::
262   running_in_this_thread(strand_impl& impl) noexcept 262   running_in_this_thread(strand_impl& impl) noexcept
263   { 263   {
HITCBC 264   12 return impl.dispatch_thread_.load() == std::this_thread::get_id(); 264   12 return impl.dispatch_thread_.load() == std::this_thread::get_id();
265   } 265   }
266   266  
267   std::coroutine_handle<> 267   std::coroutine_handle<>
HITCBC 268   8 strand_service:: 268   8 strand_service::
269   dispatch( 269   dispatch(
270   std::shared_ptr<strand_impl> const& impl, 270   std::shared_ptr<strand_impl> const& impl,
271   executor_ref ex, 271   executor_ref ex,
272 - std::coroutine_handle<> h) 272 + continuation& c)
273   { 273   {
HITCBC 274   8 if(running_in_this_thread(*impl)) 274   8 if(running_in_this_thread(*impl))
HITCBC 275 - 3 return h; 275 + 3 return c.h;
276   276  
HITCBC 277 - 5 if(strand_service_impl::enqueue(*impl, h)) 277 + 5 if(strand_service_impl::enqueue(*impl, c))
HITCBC 278   5 strand_service_impl::post_invoker(impl, ex); 278   5 strand_service_impl::post_invoker(impl, ex);
HITCBC 279   5 return std::noop_coroutine(); 279   5 return std::noop_coroutine();
280   } 280   }
281   281  
282   void 282   void
HITCBC 283   30335 strand_service:: 283   30335 strand_service::
284   post( 284   post(
285   std::shared_ptr<strand_impl> const& impl, 285   std::shared_ptr<strand_impl> const& impl,
286   executor_ref ex, 286   executor_ref ex,
287 - std::coroutine_handle<> h) 287 + continuation& c)
288   { 288   {
HITCBC 289 - 30335 if(strand_service_impl::enqueue(*impl, h)) 289 + 30335 if(strand_service_impl::enqueue(*impl, c))
HITCBC 290   19311 strand_service_impl::post_invoker(impl, ex); 290   19795 strand_service_impl::post_invoker(impl, ex);
HITCBC 291   30335 } 291   30335 }
292   292  
293   strand_service& 293   strand_service&
HITCBC 294   11442 get_strand_service(execution_context& ctx) 294   11442 get_strand_service(execution_context& ctx)
295   { 295   {
HITCBC 296   11442 return ctx.use_service<strand_service_impl>(); 296   11442 return ctx.use_service<strand_service_impl>();
297   } 297   }
298   298  
299   } // namespace detail 299   } // namespace detail
300   } // namespace capy 300   } // namespace capy
301   } // namespace boost 301   } // namespace boost