LCOV - code coverage report
Current view: top level - /jenkins/workspace/boost-root/libs/capy/src/ex/detail - strand_queue.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 74.2 % 66 49 17
Test Date: 2026-02-17 18:14:47 Functions: 85.7 % 14 12 2

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/capy
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
      11                 : #define BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
      12                 : 
      13                 : #include <boost/capy/detail/config.hpp>
      14                 : 
      15                 : #include <coroutine>
      16                 : #include <cstddef>
      17                 : #include <exception>
      18                 : 
      19                 : namespace boost {
      20                 : namespace capy {
      21                 : namespace detail {
      22                 : 
      23                 : class strand_queue;
      24                 : 
      25                 : //----------------------------------------------------------
      26                 : 
      27                 : // Metadata stored before the coroutine frame
      28                 : struct frame_prefix
      29                 : {
      30                 :     frame_prefix* next;
      31                 :     strand_queue* queue;
      32                 :     std::size_t alloc_size;
      33                 : };
      34                 : 
      35                 : //----------------------------------------------------------
      36                 : 
      37                 : /** Wrapper coroutine for strand queue dispatch operations.
      38                 : 
      39                 :     This coroutine wraps a target coroutine handle and resumes
      40                 :     it when dispatched. The wrapper ensures control returns to
      41                 :     the dispatch loop after the target suspends or completes.
      42                 : 
      43                 :     The promise contains an intrusive list node for queue
      44                 :     storage and supports a custom allocator that recycles
      45                 :     coroutine frames via a free list.
      46                 : */
      47                 : struct strand_op
      48                 : {
      49                 :     struct promise_type
      50                 :     {
      51                 :         promise_type* next = nullptr;
      52                 : 
      53                 :         void*
      54                 :         operator new(
      55                 :             std::size_t size,
      56                 :             strand_queue& q,
      57                 :             std::coroutine_handle<void>);
      58                 : 
      59                 :         void
      60                 :         operator delete(void* p, std::size_t);
      61                 : 
      62                 :         strand_op
      63 HIT         322 :         get_return_object() noexcept
      64                 :         {
      65             322 :             return {std::coroutine_handle<promise_type>::from_promise(*this)};
      66                 :         }
      67                 : 
      68                 :         std::suspend_always
      69             322 :         initial_suspend() noexcept
      70                 :         {
      71             322 :             return {};
      72                 :         }
      73                 : 
      74                 :         std::suspend_always
      75             322 :         final_suspend() noexcept
      76                 :         {
      77             322 :             return {};
      78                 :         }
      79                 : 
      80                 :         void
      81             322 :         return_void() noexcept
      82                 :         {
      83             322 :         }
      84                 : 
      85                 :         void
      86 MIS           0 :         unhandled_exception()
      87                 :         {
      88               0 :             std::terminate();
      89                 :         }
      90                 :     };
      91                 : 
      92                 :     std::coroutine_handle<promise_type> h_;
      93                 : };
      94                 : 
      95                 : //----------------------------------------------------------
      96                 : 
      97                 : /** Single-threaded dispatch queue for coroutine handles.
      98                 : 
      99                 :     This queue stores coroutine handles and resumes them
     100                 :     sequentially when dispatch() is called. Each pushed
     101                 :     handle is wrapped in a strand_op coroutine that ensures
     102                 :     control returns to the dispatch loop after the target
     103                 :     suspends or completes.
     104                 : 
     105                 :     The queue uses an intrusive singly-linked list through
     106                 :     the promise type to avoid separate node allocations.
     107                 :     A free list recycles wrapper coroutine frames to reduce
     108                 :     allocation overhead during repeated push/dispatch cycles.
     109                 : 
     110                 :     @par Thread Safety
     111                 :     This class is not thread-safe. All operations must be
     112                 :     called from a single thread.
     113                 : */
     114                 : class strand_queue
     115                 : {
     116                 :     using promise_type = strand_op::promise_type;
     117                 : 
     118                 :     promise_type* head_ = nullptr;
     119                 :     promise_type* tail_ = nullptr;
     120                 :     frame_prefix* free_list_ = nullptr;
     121                 : 
     122                 :     friend struct strand_op::promise_type;
     123                 : 
     124                 :     static
     125                 :     strand_op
     126 HIT         322 :     make_strand_op(
     127                 :         strand_queue& q,
     128                 :         std::coroutine_handle<void> target)
     129                 :     {
     130                 :         (void)q;
     131                 :         target.resume();
     132                 :         co_return;
     133             644 :     }
     134                 : 
     135                 : public:
     136            4009 :     strand_queue() = default;
     137                 : 
     138                 :     strand_queue(strand_queue const&) = delete;
     139                 :     strand_queue& operator=(strand_queue const&) = delete;
     140                 : 
     141                 :     /** Destructor.
     142                 : 
     143                 :         Destroys any pending wrappers without resuming them,
     144                 :         then frees all memory in the free list.
     145                 :     */
     146            4009 :     ~strand_queue()
     147                 :     {
     148                 :         // Destroy pending wrappers
     149            4009 :         while(head_)
     150                 :         {
     151 MIS           0 :             promise_type* p = head_;
     152               0 :             head_ = p->next;
     153                 : 
     154               0 :             auto h = std::coroutine_handle<promise_type>::from_promise(*p);
     155               0 :             h.destroy();
     156                 :         }
     157                 : 
     158                 :         // Free the free list memory
     159 HIT        4009 :         while(free_list_)
     160                 :         {
     161 MIS           0 :             frame_prefix* prefix = free_list_;
     162               0 :             free_list_ = prefix->next;
     163               0 :             ::operator delete(prefix);
     164                 :         }
     165 HIT        4009 :     }
     166                 : 
     167                 :     /** Returns true if there are no pending operations.
     168                 :     */
     169                 :     bool
     170              13 :     empty() const noexcept
     171                 :     {
     172              13 :         return head_ == nullptr;
     173                 :     }
     174                 : 
     175                 :     /** Push a coroutine handle to the queue.
     176                 : 
     177                 :         Creates a wrapper coroutine and appends it to the
     178                 :         queue. The wrapper will resume the target handle
     179                 :         when dispatch() processes it.
     180                 : 
     181                 :         @param h The coroutine handle to dispatch.
     182                 :     */
     183                 :     void
     184             322 :     push(std::coroutine_handle<void> h)
     185                 :     {
     186             322 :         strand_op op = make_strand_op(*this, h);
     187                 : 
     188             322 :         promise_type* p = &op.h_.promise();
     189             322 :         p->next = nullptr;
     190                 : 
     191             322 :         if(tail_)
     192             309 :             tail_->next = p;
     193                 :         else
     194              13 :             head_ = p;
     195             322 :         tail_ = p;
     196             322 :     }
     197                 : 
     198                 :     /** Resume all queued coroutines in sequence.
     199                 : 
     200                 :         Processes each wrapper in FIFO order, resuming its
     201                 :         target coroutine. After each target suspends or
     202                 :         completes, the wrapper is destroyed and its frame
     203                 :         is added to the free list for reuse.
     204                 : 
     205                 :         Coroutines resumed during dispatch may push new
     206                 :         handles, which will also be processed in the same
     207                 :         dispatch call.
     208                 : 
     209                 :         @warning Not thread-safe. Do not call while another
     210                 :             thread may be calling push().
     211                 :     */
     212                 :     void
     213                 :     dispatch()
     214                 :     {
     215                 :         while(head_)
     216                 :         {
     217                 :             promise_type* p = head_;
     218                 :             head_ = p->next;
     219                 :             if(!head_)
     220                 :                 tail_ = nullptr;
     221                 : 
     222                 :             auto h = std::coroutine_handle<promise_type>::from_promise(*p);
     223                 :             h.resume();
     224                 :             h.destroy();
     225                 :         }
     226                 :     }
     227                 : 
     228                 :     /** Batch of taken items for thread-safe dispatch. */
     229                 :     struct taken_batch
     230                 :     {
     231                 :         promise_type* head = nullptr;
     232                 :         promise_type* tail = nullptr;
     233                 :     };
     234                 : 
     235                 :     /** Take all pending items atomically.
     236                 : 
     237                 :         Removes all items from the queue and returns them
     238                 :         as a batch. The queue is left empty.
     239                 : 
     240                 :         @return The batch of taken items.
     241                 :     */
     242                 :     taken_batch
     243              13 :     take_all() noexcept
     244                 :     {
     245              13 :         taken_batch batch{head_, tail_};
     246              13 :         head_ = tail_ = nullptr;
     247              13 :         return batch;
     248                 :     }
     249                 : 
     250                 :     /** Dispatch a batch of taken items.
     251                 : 
     252                 :         @param batch The batch to dispatch.
     253                 : 
     254                 :         @note This is thread-safe w.r.t. push() because it doesn't
     255                 :             access the queue's free_list_. Frames are deleted directly
     256                 :             rather than recycled.
     257                 :     */
     258                 :     static
     259                 :     void
     260              13 :     dispatch_batch(taken_batch& batch)
     261                 :     {
     262             335 :         while(batch.head)
     263                 :         {
     264             322 :             promise_type* p = batch.head;
     265             322 :             batch.head = p->next;
     266                 : 
     267             322 :             auto h = std::coroutine_handle<promise_type>::from_promise(*p);
     268             322 :             h.resume();
     269                 :             // Don't use h.destroy() - it would call operator delete which
     270                 :             // accesses the queue's free_list_ (race with push).
     271                 :             // Instead, manually free the frame without recycling.
     272                 :             // h.address() returns the frame base (what operator new returned).
     273             322 :             frame_prefix* prefix = static_cast<frame_prefix*>(h.address()) - 1;
     274             322 :             ::operator delete(prefix);
     275                 :         }
     276              13 :         batch.tail = nullptr;
     277              13 :     }
     278                 : };
     279                 : 
     280                 : //----------------------------------------------------------
     281                 : 
     282                 : inline
     283                 : void*
     284             322 : strand_op::promise_type::operator new(
     285                 :     std::size_t size,
     286                 :     strand_queue& q,
     287                 :     std::coroutine_handle<void>)
     288                 : {
     289                 :     // Total size includes prefix
     290             322 :     std::size_t alloc_size = size + sizeof(frame_prefix);
     291                 :     void* raw;
     292                 :     
     293                 :     // Try to reuse from free list
     294             322 :     if(q.free_list_)
     295                 :     {
     296 MIS           0 :         frame_prefix* prefix = q.free_list_;
     297               0 :         q.free_list_ = prefix->next;
     298               0 :         raw = prefix;
     299                 :     }
     300                 :     else
     301                 :     {
     302 HIT         322 :         raw = ::operator new(alloc_size);
     303                 :     }
     304                 : 
     305                 :     // Initialize prefix
     306             322 :     frame_prefix* prefix = static_cast<frame_prefix*>(raw);
     307             322 :     prefix->next = nullptr;
     308             322 :     prefix->queue = &q;
     309             322 :     prefix->alloc_size = alloc_size;
     310                 : 
     311                 :     // Return pointer AFTER the prefix (this is where coroutine frame goes)
     312             322 :     return prefix + 1;
     313                 : }
     314                 : 
     315                 : inline
     316                 : void
     317 MIS           0 : strand_op::promise_type::operator delete(void* p, std::size_t)
     318                 : {
     319                 :     // Calculate back to get the prefix
     320               0 :     frame_prefix* prefix = static_cast<frame_prefix*>(p) - 1;
     321                 : 
     322                 :     // Add to free list
     323               0 :     prefix->next = prefix->queue->free_list_;
     324               0 :     prefix->queue->free_list_ = prefix;
     325               0 : }
     326                 : 
     327                 : } // namespace detail
     328                 : } // namespace capy
     329                 : } // namespace boost
     330                 : 
     331                 : #endif
        

Generated by: LCOV version 2.3