LCOV - code coverage report
Current view: top level - /jenkins/workspace/boost-root/libs/capy/src/ex - thread_pool.cpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 89.7 % 78 70 8
Test Date: 2026-02-17 18:14:47 Functions: 87.5 % 16 14 2

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : // Copyright (c) 2026 Michael Vandeberg
       4                 : //
       5                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       6                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       7                 : //
       8                 : // Official repository: https://github.com/boostorg/capy
       9                 : //
      10                 : 
      11                 : #include <boost/capy/ex/thread_pool.hpp>
      12                 : #include <boost/capy/detail/intrusive.hpp>
      13                 : #include <boost/capy/test/thread_name.hpp>
      14                 : #include <atomic>
      15                 : #include <condition_variable>
      16                 : #include <cstdio>
      17                 : #include <mutex>
      18                 : #include <thread>
      19                 : #include <vector>
      20                 : 
      21                 : /*
      22                 :     Thread pool implementation using a shared work queue.
      23                 : 
      24                 :     Work items are coroutine handles wrapped in intrusive list nodes, stored
      25                 :     in a single queue protected by a mutex. Worker threads wait on a
      26                 :     condition_variable until work is available or stop is requested.
      27                 : 
      28                 :     Threads are started lazily on first post() via std::call_once to avoid
      29                 :     spawning threads for pools that are constructed but never used. Each
      30                 :     thread is named with a configurable prefix plus index for debugger
      31                 :     visibility.
      32                 : 
      33                 :     Shutdown sequence: stop() sets the stop flag and notifies all threads,
      34                 :     then the destructor joins threads and destroys any remaining queued
      35                 :     work without executing it.
      36                 : */
      37                 : 
      38                 : namespace boost {
      39                 : namespace capy {
      40                 : 
      41                 : //------------------------------------------------------------------------------
      42                 : 
      43                 : class thread_pool::impl
      44                 : {
      45                 :     struct work : detail::intrusive_queue<work>::node
      46                 :     {
      47                 :         std::coroutine_handle<> h_;
      48                 : 
      49 HIT         127 :         explicit work(std::coroutine_handle<> h) noexcept
      50             127 :             : h_(h)
      51                 :         {
      52             127 :         }
      53                 : 
      54             127 :         void run()
      55                 :         {
      56             127 :             auto h = h_;
      57             127 :             delete this;
      58             127 :             h.resume();
      59             127 :         }
      60                 : 
      61 MIS           0 :         void destroy()
      62                 :         {
      63               0 :             delete this;
      64               0 :         }
      65                 :     };
      66                 : 
      67                 :     std::mutex mutex_;
      68                 :     std::condition_variable cv_;
      69                 :     detail::intrusive_queue<work> q_;
      70                 :     std::vector<std::thread> threads_;
      71                 :     std::atomic<bool> stop_{false};
      72                 :     std::size_t num_threads_;
      73                 :     char thread_name_prefix_[13]{};  // 12 chars max + null terminator
      74                 :     std::once_flag start_flag_;
      75                 : 
      76                 : public:
      77 HIT          61 :     ~impl()
      78                 :     {
      79              61 :         stop();
      80              99 :         for(auto& t : threads_)
      81              38 :             if(t.joinable())
      82              38 :                 t.join();
      83                 : 
      84              61 :         while(auto* w = q_.pop())
      85 MIS           0 :             w->destroy();
      86 HIT          61 :     }
      87                 : 
      88              61 :     impl(std::size_t num_threads, std::string_view thread_name_prefix)
      89              61 :         : num_threads_(num_threads)
      90                 :     {
      91              61 :         if(num_threads_ == 0)
      92               2 :             num_threads_ = std::thread::hardware_concurrency();
      93              61 :         if(num_threads_ == 0)
      94 MIS           0 :             num_threads_ = 1;
      95                 : 
      96                 :         // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
      97 HIT          61 :         auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
      98              61 :         thread_name_prefix_[n] = '\0';
      99              61 :     }
     100                 : 
     101                 :     void
     102             127 :     post(std::coroutine_handle<> h)
     103                 :     {
     104             127 :         ensure_started();
     105             127 :         auto* w = new work(h);
     106                 :         {
     107             127 :             std::lock_guard<std::mutex> lock(mutex_);
     108             127 :             q_.push(w);
     109             127 :         }
     110             127 :         cv_.notify_one();
     111             127 :     }
     112                 : 
     113                 :     void
     114              61 :     stop() noexcept
     115                 :     {
     116              61 :         stop_.store(true, std::memory_order_release);
     117              61 :         cv_.notify_all();
     118              61 :     }
     119                 : 
     120                 : private:
     121                 :     void
     122             127 :     ensure_started()
     123                 :     {
     124             127 :         std::call_once(start_flag_, [this]{
     125              22 :             threads_.reserve(num_threads_);
     126              60 :             for(std::size_t i = 0; i < num_threads_; ++i)
     127              76 :                 threads_.emplace_back([this, i]{ run(i); });
     128              22 :         });
     129             127 :     }
     130                 : 
     131                 :     void
     132              38 :     run(std::size_t index)
     133                 :     {
     134                 :         // Build name; set_current_thread_name truncates to platform limits.
     135                 :         char name[16];
     136              38 :         std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
     137              38 :         set_current_thread_name(name);
     138                 : 
     139                 :         for(;;)
     140                 :         {
     141             165 :             work* w = nullptr;
     142                 :             {
     143             165 :                 std::unique_lock<std::mutex> lock(mutex_);
     144             165 :                 cv_.wait(lock, [this]{
     145             297 :                     return !q_.empty() ||
     146             297 :                         stop_.load(std::memory_order_acquire);
     147                 :                 });
     148             165 :                 if(stop_.load(std::memory_order_acquire) && q_.empty())
     149              76 :                     return;
     150             127 :                 w = q_.pop();
     151             165 :             }
     152             127 :             if(w)
     153             127 :                 w->run();
     154             127 :         }
     155                 :     }
     156                 : };
     157                 : 
     158                 : //------------------------------------------------------------------------------
     159                 : 
     160              61 : thread_pool::
     161                 : ~thread_pool()
     162                 : {
     163              61 :     shutdown();
     164              61 :     destroy();
     165              61 :     delete impl_;
     166              61 : }
     167                 : 
     168              61 : thread_pool::
     169              61 : thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
     170              61 :     : impl_(new impl(num_threads, thread_name_prefix))
     171                 : {
     172              61 :     this->set_frame_allocator(std::allocator<void>{});
     173              61 : }
     174                 : 
     175                 : void
     176 MIS           0 : thread_pool::
     177                 : stop() noexcept
     178                 : {
     179               0 :     impl_->stop();
     180               0 : }
     181                 : 
     182                 : //------------------------------------------------------------------------------
     183                 : 
     184                 : void
     185 HIT         127 : thread_pool::executor_type::
     186                 : post(std::coroutine_handle<> h) const
     187                 : {
     188             127 :     pool_->impl_->post(h);
     189             127 : }
     190                 : 
     191                 : } // capy
     192                 : } // boost
        

Generated by: LCOV version 2.3