LCOV - code coverage report
Current view: top level - /jenkins/workspace/boost-root/libs/capy/src/ex/detail - strand_service.cpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 96.7 % 91 88 3
Test Date: 2026-02-17 18:14:47 Functions: 91.3 % 23 21 2

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/capy
       8                 : //
       9                 : 
      10                 : #include "src/ex/detail/strand_queue.hpp"
      11                 : #include <boost/capy/ex/detail/strand_service.hpp>
      12                 : #include <atomic>
      13                 : #include <coroutine>
      14                 : #include <mutex>
      15                 : #include <thread>
      16                 : #include <utility>
      17                 : 
      18                 : namespace boost {
      19                 : namespace capy {
      20                 : namespace detail {
      21                 : 
      22                 : //----------------------------------------------------------
      23                 : 
      24                 : /** Implementation state for a strand.
      25                 : 
      26                 :     Each strand_impl provides serialization for coroutines
      27                 :     dispatched through strands that share it.
      28                 : */
      29                 : struct strand_impl
      30                 : {
      31                 :     std::mutex mutex_;
      32                 :     strand_queue pending_;
      33                 :     bool locked_ = false;
      34                 :     std::atomic<std::thread::id> dispatch_thread_{};
      35                 :     void* cached_frame_ = nullptr;
      36                 : };
      37                 : 
      38                 : //----------------------------------------------------------
      39                 : 
      40                 : /** Invoker coroutine for strand dispatch.
      41                 : 
      42                 :     Uses custom allocator to recycle frame - one allocation
      43                 :     per strand_impl lifetime, stored in trailer for recovery.
      44                 : */
      45                 : struct strand_invoker
      46                 : {
      47                 :     struct promise_type
      48                 :     {
      49 HIT           9 :         void* operator new(std::size_t n, strand_impl& impl)
      50                 :         {
      51               9 :             constexpr auto A = alignof(strand_impl*);
      52               9 :             std::size_t padded = (n + A - 1) & ~(A - 1);
      53               9 :             std::size_t total = padded + sizeof(strand_impl*);
      54                 : 
      55               9 :             void* p = impl.cached_frame_
      56               9 :                 ? std::exchange(impl.cached_frame_, nullptr)
      57               7 :                 : ::operator new(total);
      58                 : 
      59                 :             // Trailer lets delete recover impl
      60               9 :             *reinterpret_cast<strand_impl**>(
      61               9 :                 static_cast<char*>(p) + padded) = &impl;
      62               9 :             return p;
      63                 :         }
      64                 : 
      65               9 :         void operator delete(void* p, std::size_t n) noexcept
      66                 :         {
      67               9 :             constexpr auto A = alignof(strand_impl*);
      68               9 :             std::size_t padded = (n + A - 1) & ~(A - 1);
      69                 : 
      70               9 :             auto* impl = *reinterpret_cast<strand_impl**>(
      71                 :                 static_cast<char*>(p) + padded);
      72                 : 
      73               9 :             if (!impl->cached_frame_)
      74               9 :                 impl->cached_frame_ = p;
      75                 :             else
      76 MIS           0 :                 ::operator delete(p);
      77 HIT           9 :         }
      78                 : 
      79               9 :         strand_invoker get_return_object() noexcept
      80               9 :         { return {std::coroutine_handle<promise_type>::from_promise(*this)}; }
      81                 : 
      82               9 :         std::suspend_always initial_suspend() noexcept { return {}; }
      83               9 :         std::suspend_never final_suspend() noexcept { return {}; }
      84               9 :         void return_void() noexcept {}
      85 MIS           0 :         void unhandled_exception() { std::terminate(); }
      86                 :     };
      87                 : 
      88                 :     std::coroutine_handle<promise_type> h_;
      89                 : };
      90                 : 
      91                 : //----------------------------------------------------------
      92                 : 
      93                 : /** Concrete implementation of strand_service.
      94                 : 
      95                 :     Holds the fixed pool of strand_impl objects.
      96                 : */
      97                 : class strand_service_impl : public strand_service
      98                 : {
      99                 :     static constexpr std::size_t num_impls = 211;
     100                 : 
     101                 :     strand_impl impls_[num_impls];
     102                 :     std::size_t salt_ = 0;
     103                 :     std::mutex mutex_;
     104                 : 
     105                 : public:
     106                 :     explicit
     107 HIT          19 :     strand_service_impl(execution_context&)
     108            4028 :     {
     109              19 :     }
     110                 : 
     111                 :     strand_impl*
     112              23 :     get_implementation() override
     113                 :     {
     114              23 :         std::lock_guard<std::mutex> lock(mutex_);
     115              23 :         std::size_t index = salt_++;
     116              23 :         index = index % num_impls;
     117              23 :         return &impls_[index];
     118              23 :     }
     119                 : 
     120                 : protected:
     121                 :     void
     122              19 :     shutdown() override
     123                 :     {
     124            4028 :         for(std::size_t i = 0; i < num_impls; ++i)
     125                 :         {
     126            4009 :             std::lock_guard<std::mutex> lock(impls_[i].mutex_);
     127            4009 :             impls_[i].locked_ = true;
     128                 : 
     129            4009 :             if(impls_[i].cached_frame_)
     130                 :             {
     131               7 :                 ::operator delete(impls_[i].cached_frame_);
     132               7 :                 impls_[i].cached_frame_ = nullptr;
     133                 :             }
     134            4009 :         }
     135              19 :     }
     136                 : 
     137                 : private:
     138                 :     static bool
     139             322 :     enqueue(strand_impl& impl, std::coroutine_handle<> h)
     140                 :     {
     141             322 :         std::lock_guard<std::mutex> lock(impl.mutex_);
     142             322 :         impl.pending_.push(h);
     143             322 :         if(!impl.locked_)
     144                 :         {
     145               9 :             impl.locked_ = true;
     146               9 :             return true;
     147                 :         }
     148             313 :         return false;
     149             322 :     }
     150                 : 
     151                 :     static void
     152              13 :     dispatch_pending(strand_impl& impl)
     153                 :     {
     154              13 :         strand_queue::taken_batch batch;
     155                 :         {
     156              13 :             std::lock_guard<std::mutex> lock(impl.mutex_);
     157              13 :             batch = impl.pending_.take_all();
     158              13 :         }
     159              13 :         impl.pending_.dispatch_batch(batch);
     160              13 :     }
     161                 : 
     162                 :     static bool
     163              13 :     try_unlock(strand_impl& impl)
     164                 :     {
     165              13 :         std::lock_guard<std::mutex> lock(impl.mutex_);
     166              13 :         if(impl.pending_.empty())
     167                 :         {
     168               9 :             impl.locked_ = false;
     169               9 :             return true;
     170                 :         }
     171               4 :         return false;
     172              13 :     }
     173                 : 
     174                 :     static void
     175              13 :     set_dispatch_thread(strand_impl& impl) noexcept
     176                 :     {
     177              13 :         impl.dispatch_thread_.store(std::this_thread::get_id());
     178              13 :     }
     179                 : 
     180                 :     static void
     181               9 :     clear_dispatch_thread(strand_impl& impl) noexcept
     182                 :     {
     183               9 :         impl.dispatch_thread_.store(std::thread::id{});
     184               9 :     }
     185                 : 
     186                 :     // Loops until queue empty (aggressive). Alternative: per-batch fairness
     187                 :     // (repost after each batch to let other work run) - explore if starvation observed.
     188                 :     static strand_invoker
     189               9 :     make_invoker(strand_impl& impl)
     190                 :     {
     191                 :         strand_impl* p = &impl;
     192                 :         for(;;)
     193                 :         {
     194                 :             set_dispatch_thread(*p);
     195                 :             dispatch_pending(*p);
     196                 :             if(try_unlock(*p))
     197                 :             {
     198                 :                 clear_dispatch_thread(*p);
     199                 :                 co_return;
     200                 :             }
     201                 :         }
     202              18 :     }
     203                 : 
     204                 :     friend class strand_service;
     205                 : };
     206                 : 
     207                 : //----------------------------------------------------------
     208                 : 
     209              19 : strand_service::
     210              19 : strand_service()
     211              19 :     : service()
     212                 : {
     213              19 : }
     214                 : 
     215              19 : strand_service::
     216                 : ~strand_service() = default;
     217                 : 
     218                 : bool
     219               2 : strand_service::
     220                 : running_in_this_thread(strand_impl& impl) noexcept
     221                 : {
     222               2 :     return impl.dispatch_thread_.load() == std::this_thread::get_id();
     223                 : }
     224                 : 
     225                 : std::coroutine_handle<>
     226               1 : strand_service::
     227                 : dispatch(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h)
     228                 : {
     229               1 :     if(running_in_this_thread(impl))
     230 MIS           0 :         return h;
     231                 : 
     232 HIT           1 :     if(strand_service_impl::enqueue(impl, h))
     233               1 :         ex.post(strand_service_impl::make_invoker(impl).h_);
     234               1 :     return std::noop_coroutine();
     235                 : }
     236                 : 
     237                 : void
     238             321 : strand_service::
     239                 : post(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h)
     240                 : {
     241             321 :     if(strand_service_impl::enqueue(impl, h))
     242               8 :         ex.post(strand_service_impl::make_invoker(impl).h_);
     243             321 : }
     244                 : 
     245                 : strand_service&
     246              23 : get_strand_service(execution_context& ctx)
     247                 : {
     248              23 :     return ctx.use_service<strand_service_impl>();
     249                 : }
     250                 : 
     251                 : } // namespace detail
     252                 : } // namespace capy
     253                 : } // namespace boost
        

Generated by: LCOV version 2.3