LCOV - code coverage report
Current view: top level - capy - when_any.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 99.3 % 145 144 1
Test Date: 2026-02-17 18:14:47 Functions: 93.4 % 518 484 34

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Michael Vandeberg
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/capy
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_CAPY_WHEN_ANY_HPP
      11                 : #define BOOST_CAPY_WHEN_ANY_HPP
      12                 : 
      13                 : #include <boost/capy/detail/config.hpp>
      14                 : #include <boost/capy/concept/executor.hpp>
      15                 : #include <boost/capy/concept/io_awaitable.hpp>
      16                 : #include <coroutine>
      17                 : #include <boost/capy/ex/executor_ref.hpp>
      18                 : #include <boost/capy/ex/frame_allocator.hpp>
      19                 : #include <boost/capy/ex/io_env.hpp>
      20                 : #include <boost/capy/task.hpp>
      21                 : 
      22                 : #include <array>
      23                 : #include <atomic>
      24                 : #include <exception>
      25                 : #include <optional>
      26                 : #include <ranges>
      27                 : #include <stdexcept>
      28                 : #include <stop_token>
      29                 : #include <tuple>
      30                 : #include <type_traits>
      31                 : #include <utility>
      32                 : #include <variant>
      33                 : #include <vector>
      34                 : 
      35                 : /*
      36                 :    when_any - Race multiple tasks, return first completion
      37                 :    ========================================================
      38                 : 
      39                 :    OVERVIEW:
      40                 :    ---------
      41                 :    when_any launches N tasks concurrently and completes when the FIRST task
      42                 :    finishes (success or failure). It then requests stop for all siblings and
      43                 :    waits for them to acknowledge before returning.
      44                 : 
      45                 :    ARCHITECTURE:
      46                 :    -------------
      47                 :    The design mirrors when_all but with inverted completion semantics:
      48                 : 
      49                 :      when_all:  complete when remaining_count reaches 0 (all done)
      50                 :      when_any:  complete when has_winner becomes true (first done)
      51                 :                 BUT still wait for remaining_count to reach 0 for cleanup
      52                 : 
      53                 :    Key components:
      54                 :      - when_any_state:    Shared state tracking winner and completion
      55                 :      - when_any_runner:   Wrapper coroutine for each child task
      56                 :      - when_any_launcher: Awaitable that starts all runners concurrently
      57                 : 
      58                 :    CRITICAL INVARIANTS:
      59                 :    --------------------
      60                 :    1. Exactly one task becomes the winner (via atomic compare_exchange)
      61                 :    2. All tasks must complete before parent resumes (cleanup safety)
      62                 :    3. Stop is requested immediately when winner is determined
      63                 :    4. Only the winner's result/exception is stored
      64                 : 
      65                 :    TYPE DEDUPLICATION:
      66                 :    -------------------
      67                 :    std::variant requires unique alternative types. Since when_any can race
      68                 :    tasks with identical return types (e.g., three task<int>), we must
      69                 :    deduplicate types before constructing the variant.
      70                 : 
      71                 :    Example: when_any(task<int>, task<string>, task<int>)
      72                 :      - Raw types after void->monostate: int, string, int
      73                 :      - Deduplicated variant: std::variant<int, string>
      74                 :      - Return: pair<size_t, variant<int, string>>
      75                 : 
      76                 :    The winner_index tells you which task won (0, 1, or 2), while the variant
      77                 :    holds the result. Use the index to determine how to interpret the variant.
      78                 : 
      79                 :    VOID HANDLING:
      80                 :    --------------
      81                 :    void tasks contribute std::monostate to the variant (then deduplicated).
      82                 :    All-void tasks result in: pair<size_t, variant<monostate>>
      83                 : 
      84                 :    MEMORY MODEL:
      85                 :    -------------
      86                 :    Synchronization chain from winner's write to parent's read:
      87                 : 
      88                 :    1. Winner thread writes result_/winner_exception_ (non-atomic)
      89                 :    2. Winner thread calls signal_completion() → fetch_sub(acq_rel) on remaining_count_
      90                 :    3. Last task thread (may be winner or non-winner) calls signal_completion()
      91                 :       → fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
      92                 :    4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
      93                 :    5. Parent coroutine resumes and reads result_/winner_exception_
      94                 : 
      95                 :    Synchronization analysis:
      96                 :    - All fetch_sub operations on remaining_count_ form a release sequence
      97                 :    - Winner's fetch_sub releases; subsequent fetch_sub operations participate
      98                 :      in the modification order of remaining_count_
      99                 :    - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
     100                 :      modification order, establishing happens-before from winner's writes
     101                 :    - Executor dispatch() is expected to provide queue-based synchronization
     102                 :      (release-on-post, acquire-on-execute) completing the chain to parent
     103                 :    - Even inline executors work (same thread = sequenced-before)
     104                 : 
     105                 :    Alternative considered: Adding winner_ready_ atomic (set with release after
     106                 :    storing winner data, acquired before reading) would make synchronization
     107                 :    self-contained and not rely on executor implementation details. Current
     108                 :    approach is correct but requires careful reasoning about release sequences
     109                 :    and executor behavior.
     110                 : 
     111                 :    EXCEPTION SEMANTICS:
     112                 :    --------------------
     113                 :    Unlike when_all (which captures first exception, discards others), when_any
     114                 :    treats exceptions as valid completions. If the winning task threw, that
     115                 :    exception is rethrown. Exceptions from non-winners are silently discarded.
     116                 : */
     117                 : 
     118                 : namespace boost {
     119                 : namespace capy {
     120                 : 
     121                 : namespace detail {
     122                 : 
     123                 : /** Convert void to monostate for variant storage.
     124                 : 
     125                 :     std::variant<void, ...> is ill-formed, so void tasks contribute
     126                 :     std::monostate to the result variant instead. Non-void types
     127                 :     pass through unchanged.
     128                 : 
     129                 :     @tparam T The type to potentially convert (void becomes monostate).
     130                 : */
     131                 : template<typename T>
     132                 : using void_to_monostate_t = std::conditional_t<std::is_void_v<T>, std::monostate, T>;
     133                 : 
     134                 : // Type deduplication: std::variant requires unique alternative types.
     135                 : // Fold left over the type list, appending each type only if not already present.
     136                 : template<typename Variant, typename T>
     137                 : struct variant_append_if_unique;
     138                 : 
     139                 : template<typename... Vs, typename T>
     140                 : struct variant_append_if_unique<std::variant<Vs...>, T>
     141                 : {
     142                 :     using type = std::conditional_t<
     143                 :         (std::is_same_v<T, Vs> || ...),
     144                 :         std::variant<Vs...>,
     145                 :         std::variant<Vs..., T>>;
     146                 : };
     147                 : 
     148                 : template<typename Accumulated, typename... Remaining>
     149                 : struct deduplicate_impl;
     150                 : 
     151                 : template<typename Accumulated>
     152                 : struct deduplicate_impl<Accumulated>
     153                 : {
     154                 :     using type = Accumulated;
     155                 : };
     156                 : 
     157                 : template<typename Accumulated, typename T, typename... Rest>
     158                 : struct deduplicate_impl<Accumulated, T, Rest...>
     159                 : {
     160                 :     using next = typename variant_append_if_unique<Accumulated, T>::type;
     161                 :     using type = typename deduplicate_impl<next, Rest...>::type;
     162                 : };
     163                 : 
     164                 : // Deduplicated variant; void types become monostate before deduplication
     165                 : template<typename T0, typename... Ts>
     166                 : using unique_variant_t = typename deduplicate_impl<
     167                 :     std::variant<void_to_monostate_t<T0>>,
     168                 :     void_to_monostate_t<Ts>...>::type;
     169                 : 
     170                 : // Result: (winner_index, deduplicated_variant). Use index to disambiguate
     171                 : // when multiple tasks share the same return type.
     172                 : template<typename T0, typename... Ts>
     173                 : using when_any_result_t = std::pair<std::size_t, unique_variant_t<T0, Ts...>>;
     174                 : 
     175                 : /** Core shared state for when_any operations.
     176                 : 
     177                 :     Contains all members and methods common to both heterogeneous (variadic)
     178                 :     and homogeneous (range) when_any implementations. State classes embed
     179                 :     this via composition to avoid CRTP destructor ordering issues.
     180                 : 
     181                 :     @par Thread Safety
     182                 :     Atomic operations protect winner selection and completion count.
     183                 : */
     184                 : struct when_any_core
     185                 : {
     186                 :     std::atomic<std::size_t> remaining_count_;
     187                 :     std::size_t winner_index_{0};
     188                 :     std::exception_ptr winner_exception_;
     189                 :     std::stop_source stop_source_;
     190                 : 
     191                 :     // Bridges parent's stop token to our stop_source
     192                 :     struct stop_callback_fn
     193                 :     {
     194                 :         std::stop_source* source_;
     195 HIT           9 :         void operator()() const noexcept { source_->request_stop(); }
     196                 :     };
     197                 :     using stop_callback_t = std::stop_callback<stop_callback_fn>;
     198                 :     std::optional<stop_callback_t> parent_stop_callback_;
     199                 : 
     200                 :     std::coroutine_handle<> continuation_;
     201                 :     io_env const* caller_env_ = nullptr;
     202                 : 
     203                 :     // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
     204                 :     std::atomic<bool> has_winner_{false};
     205                 : 
     206              65 :     explicit when_any_core(std::size_t count) noexcept
     207              65 :         : remaining_count_(count)
     208                 :     {
     209              65 :     }
     210                 : 
     211                 :     /** Atomically claim winner status; exactly one task succeeds. */
     212             190 :     bool try_win(std::size_t index) noexcept
     213                 :     {
     214             190 :         bool expected = false;
     215             190 :         if(has_winner_.compare_exchange_strong(
     216                 :             expected, true, std::memory_order_acq_rel))
     217                 :         {
     218              65 :             winner_index_ = index;
     219              65 :             stop_source_.request_stop();
     220              65 :             return true;
     221                 :         }
     222             125 :         return false;
     223                 :     }
     224                 : 
     225                 :     /** @pre try_win() returned true. */
     226               8 :     void set_winner_exception(std::exception_ptr ep) noexcept
     227                 :     {
     228               8 :         winner_exception_ = ep;
     229               8 :     }
     230                 : 
     231                 :     // Runners signal completion directly via final_suspend; no member function needed.
     232                 : };
     233                 : 
     234                 : /** Shared state for heterogeneous when_any operation.
     235                 : 
     236                 :     Coordinates winner selection, result storage, and completion tracking
     237                 :     for all child tasks in a when_any operation. Uses composition with
     238                 :     when_any_core for shared functionality.
     239                 : 
     240                 :     @par Lifetime
     241                 :     Allocated on the parent coroutine's frame, outlives all runners.
     242                 : 
     243                 :     @tparam T0 First task's result type.
     244                 :     @tparam Ts Remaining tasks' result types.
     245                 : */
     246                 : template<typename T0, typename... Ts>
     247                 : struct when_any_state
     248                 : {
     249                 :     static constexpr std::size_t task_count = 1 + sizeof...(Ts);
     250                 :     using variant_type = unique_variant_t<T0, Ts...>;
     251                 : 
     252                 :     when_any_core core_;
     253                 :     std::optional<variant_type> result_;
     254                 :     std::array<std::coroutine_handle<>, task_count> runner_handles_{};
     255                 : 
     256              43 :     when_any_state()
     257              43 :         : core_(task_count)
     258                 :     {
     259              43 :     }
     260                 : 
     261                 :     // Runners self-destruct in final_suspend. No destruction needed here.
     262                 : 
     263                 :     /** @pre core_.try_win() returned true.
     264                 :         @note Uses in_place_type (not index) because variant is deduplicated.
     265                 :     */
     266                 :     template<typename T>
     267              35 :     void set_winner_result(T value)
     268                 :         noexcept(std::is_nothrow_move_constructible_v<T>)
     269                 :     {
     270              35 :         result_.emplace(std::in_place_type<T>, std::move(value));
     271              35 :     }
     272                 : 
     273                 :     /** @pre core_.try_win() returned true. */
     274               3 :     void set_winner_void() noexcept
     275                 :     {
     276               3 :         result_.emplace(std::in_place_type<std::monostate>, std::monostate{});
     277               3 :     }
     278                 : };
     279                 : 
     280                 : /** Wrapper coroutine that runs a single child task for when_any.
     281                 : 
     282                 :     Propagates executor/stop_token to the child, attempts to claim winner
     283                 :     status on completion, and signals completion for cleanup coordination.
     284                 : 
     285                 :     @tparam StateType The state type (when_any_state or when_any_homogeneous_state).
     286                 : */
     287                 : template<typename StateType>
     288                 : struct when_any_runner
     289                 : {
     290                 :     struct promise_type // : frame_allocating_base  // DISABLED FOR TESTING
     291                 :     {
     292                 :         StateType* state_ = nullptr;
     293                 :         std::size_t index_ = 0;
     294                 :         io_env env_;
     295                 : 
     296             190 :         when_any_runner get_return_object() noexcept
     297                 :         {
     298             190 :             return when_any_runner(std::coroutine_handle<promise_type>::from_promise(*this));
     299                 :         }
     300                 : 
     301                 :         // Starts suspended; launcher sets up state/ex/token then resumes
     302             190 :         std::suspend_always initial_suspend() noexcept
     303                 :         {
     304             190 :             return {};
     305                 :         }
     306                 : 
     307             190 :         auto final_suspend() noexcept
     308                 :         {
     309                 :             struct awaiter
     310                 :             {
     311                 :                 promise_type* p_;
     312             190 :                 bool await_ready() const noexcept { return false; }
     313             190 :                 std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) noexcept
     314                 :                 {
     315                 :                     // Extract everything needed before self-destruction.
     316             190 :                     auto& core = p_->state_->core_;
     317             190 :                     auto* counter = &core.remaining_count_;
     318             190 :                     auto* caller_env = core.caller_env_;
     319             190 :                     auto cont = core.continuation_;
     320                 : 
     321             190 :                     h.destroy();
     322                 : 
     323                 :                     // If last runner, dispatch parent for symmetric transfer.
     324             190 :                     auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
     325             190 :                     if(remaining == 1)
     326              65 :                         return caller_env->executor.dispatch(cont);
     327             125 :                     return std::noop_coroutine();
     328                 :                 }
     329 MIS           0 :                 void await_resume() const noexcept {}
     330                 :             };
     331 HIT         190 :             return awaiter{this};
     332                 :         }
     333                 : 
     334             178 :         void return_void() noexcept {}
     335                 : 
     336                 :         // Exceptions are valid completions in when_any (unlike when_all)
     337              12 :         void unhandled_exception()
     338                 :         {
     339              12 :             if(state_->core_.try_win(index_))
     340               8 :                 state_->core_.set_winner_exception(std::current_exception());
     341              12 :         }
     342                 : 
     343                 :         /** Injects executor and stop token into child awaitables. */
     344                 :         template<class Awaitable>
     345                 :         struct transform_awaiter
     346                 :         {
     347                 :             std::decay_t<Awaitable> a_;
     348                 :             promise_type* p_;
     349                 : 
     350             190 :             bool await_ready() { return a_.await_ready(); }
     351             190 :             auto await_resume() { return a_.await_resume(); }
     352                 : 
     353                 :             template<class Promise>
     354             185 :             auto await_suspend(std::coroutine_handle<Promise> h)
     355                 :             {
     356             185 :                 return a_.await_suspend(h, &p_->env_);
     357                 :             }
     358                 :         };
     359                 : 
     360                 :         template<class Awaitable>
     361             190 :         auto await_transform(Awaitable&& a)
     362                 :         {
     363                 :             using A = std::decay_t<Awaitable>;
     364                 :             if constexpr (IoAwaitable<A>)
     365                 :             {
     366                 :                 return transform_awaiter<Awaitable>{
     367             380 :                     std::forward<Awaitable>(a), this};
     368                 :             }
     369                 :             else
     370                 :             {
     371                 :                 static_assert(sizeof(A) == 0, "requires IoAwaitable");
     372                 :             }
     373             190 :         }
     374                 :     };
     375                 : 
     376                 :     std::coroutine_handle<promise_type> h_;
     377                 : 
     378             190 :     explicit when_any_runner(std::coroutine_handle<promise_type> h) noexcept
     379             190 :         : h_(h)
     380                 :     {
     381             190 :     }
     382                 : 
     383                 :     // Enable move for all clang versions - some versions need it
     384                 :     when_any_runner(when_any_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
     385                 : 
     386                 :     // Non-copyable
     387                 :     when_any_runner(when_any_runner const&) = delete;
     388                 :     when_any_runner& operator=(when_any_runner const&) = delete;
     389                 :     when_any_runner& operator=(when_any_runner&&) = delete;
     390                 : 
     391             190 :     auto release() noexcept
     392                 :     {
     393             190 :         return std::exchange(h_, nullptr);
     394                 :     }
     395                 : };
     396                 : 
     397                 : /** Wraps a child awaitable, attempts to claim winner on completion.
     398                 : 
     399                 :     Uses requires-expressions to detect state capabilities:
     400                 :     - set_winner_void(): for heterogeneous void tasks (stores monostate)
     401                 :     - set_winner_result(): for non-void tasks
     402                 :     - Neither: for homogeneous void tasks (no result storage)
     403                 : */
     404                 : template<IoAwaitable Awaitable, typename StateType>
     405                 : when_any_runner<StateType>
     406             190 : make_when_any_runner(Awaitable inner, StateType* state, std::size_t index)
     407                 : {
     408                 :     using T = awaitable_result_t<Awaitable>;
     409                 :     if constexpr (std::is_void_v<T>)
     410                 :     {
     411                 :         co_await std::move(inner);
     412                 :         if(state->core_.try_win(index))
     413                 :         {
     414                 :             // Heterogeneous void tasks store monostate in the variant
     415                 :             if constexpr (requires { state->set_winner_void(); })
     416                 :                 state->set_winner_void();
     417                 :             // Homogeneous void tasks have no result to store
     418                 :         }
     419                 :     }
     420                 :     else
     421                 :     {
     422                 :         auto result = co_await std::move(inner);
     423                 :         if(state->core_.try_win(index))
     424                 :         {
     425                 :             // Defensive: move should not throw (already moved once), but we
     426                 :             // catch just in case since an uncaught exception would be devastating.
     427                 :             try
     428                 :             {
     429                 :                 state->set_winner_result(std::move(result));
     430                 :             }
     431                 :             catch(...)
     432                 :             {
     433                 :                 state->core_.set_winner_exception(std::current_exception());
     434                 :             }
     435                 :         }
     436                 :     }
     437             380 : }
     438                 : 
     439                 : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
     440                 : template<IoAwaitable... Awaitables>
     441                 : class when_any_launcher
     442                 : {
     443                 :     using state_type = when_any_state<awaitable_result_t<Awaitables>...>;
     444                 : 
     445                 :     std::tuple<Awaitables...>* tasks_;
     446                 :     state_type* state_;
     447                 : 
     448                 : public:
     449              43 :     when_any_launcher(
     450                 :         std::tuple<Awaitables...>* tasks,
     451                 :         state_type* state)
     452              43 :         : tasks_(tasks)
     453              43 :         , state_(state)
     454                 :     {
     455              43 :     }
     456                 : 
     457              43 :     bool await_ready() const noexcept
     458                 :     {
     459              43 :         return sizeof...(Awaitables) == 0;
     460                 :     }
     461                 : 
     462                 :     /** CRITICAL: If the last task finishes synchronously, parent resumes and
     463                 :         destroys this object before await_suspend returns. Must not reference
     464                 :         `this` after the final launch_one call.
     465                 :     */
     466              43 :     std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
     467                 :     {
     468              43 :         state_->core_.continuation_ = continuation;
     469              43 :         state_->core_.caller_env_ = caller_env;
     470                 : 
     471              43 :         if(caller_env->stop_token.stop_possible())
     472                 :         {
     473              18 :             state_->core_.parent_stop_callback_.emplace(
     474               9 :                 caller_env->stop_token,
     475               9 :                 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
     476                 : 
     477               9 :             if(caller_env->stop_token.stop_requested())
     478               3 :                 state_->core_.stop_source_.request_stop();
     479                 :         }
     480                 : 
     481              43 :         auto token = state_->core_.stop_source_.get_token();
     482              86 :         [&]<std::size_t... Is>(std::index_sequence<Is...>) {
     483              43 :             (..., launch_one<Is>(caller_env->executor, token));
     484              43 :         }(std::index_sequence_for<Awaitables...>{});
     485                 : 
     486              86 :         return std::noop_coroutine();
     487              43 :     }
     488                 : 
     489              43 :     void await_resume() const noexcept
     490                 :     {
     491              43 :     }
     492                 : 
     493                 : private:
     494                 :     /** @pre Ex::dispatch() and std::coroutine_handle<>::resume() must not throw (handle may leak). */
     495                 :     template<std::size_t I>
     496             105 :     void launch_one(executor_ref caller_ex, std::stop_token token)
     497                 :     {
     498             105 :         auto runner = make_when_any_runner(
     499             105 :             std::move(std::get<I>(*tasks_)), state_, I);
     500                 : 
     501             105 :         auto h = runner.release();
     502             105 :         h.promise().state_ = state_;
     503             105 :         h.promise().index_ = I;
     504             105 :         h.promise().env_ = io_env{caller_ex, token, state_->core_.caller_env_->allocator};
     505                 : 
     506             105 :         std::coroutine_handle<> ch{h};
     507             105 :         state_->runner_handles_[I] = ch;
     508             105 :         caller_ex.post(ch);
     509             210 :     }
     510                 : };
     511                 : 
     512                 : } // namespace detail
     513                 : 
     514                 : /** Wait for the first awaitable to complete.
     515                 : 
     516                 :     Races multiple heterogeneous awaitables concurrently and returns when the
     517                 :     first one completes. The result includes the winner's index and a
     518                 :     deduplicated variant containing the result value.
     519                 : 
     520                 :     @par Suspends
     521                 :     The calling coroutine suspends when co_await is invoked. All awaitables
     522                 :     are launched concurrently and execute in parallel. The coroutine resumes
     523                 :     only after all awaitables have completed, even though the winner is
     524                 :     determined by the first to finish.
     525                 : 
     526                 :     @par Completion Conditions
     527                 :     @li Winner is determined when the first awaitable completes (success or exception)
     528                 :     @li Only one task can claim winner status via atomic compare-exchange
     529                 :     @li Once a winner exists, stop is requested for all remaining siblings
     530                 :     @li Parent coroutine resumes only after all siblings acknowledge completion
     531                 :     @li The winner's result is returned; if the winner threw, the exception is rethrown
     532                 : 
     533                 :     @par Cancellation Semantics
     534                 :     Cancellation is supported via stop_token propagated through the
     535                 :     IoAwaitable protocol:
     536                 :     @li Each child awaitable receives a stop_token derived from a shared stop_source
     537                 :     @li When the parent's stop token is activated, the stop is forwarded to all children
     538                 :     @li When a winner is determined, stop_source_.request_stop() is called immediately
     539                 :     @li Siblings must handle cancellation gracefully and complete before parent resumes
     540                 :     @li Stop requests are cooperative; tasks must check and respond to them
     541                 : 
     542                 :     @par Concurrency/Overlap
     543                 :     All awaitables are launched concurrently before any can complete.
     544                 :     The launcher iterates through the arguments, starting each task on the
     545                 :     caller's executor. Tasks may execute in parallel on multi-threaded
     546                 :     executors or interleave on single-threaded executors. There is no
     547                 :     guaranteed ordering of task completion.
     548                 : 
     549                 :     @par Notable Error Conditions
     550                 :     @li Winner exception: if the winning task threw, that exception is rethrown
     551                 :     @li Non-winner exceptions: silently discarded (only winner's result matters)
     552                 :     @li Cancellation: tasks may complete via cancellation without throwing
     553                 : 
     554                 :     @par Example
     555                 :     @code
     556                 :     task<void> example() {
     557                 :         auto [index, result] = co_await when_any(
     558                 :             fetch_from_primary(),   // task<Response>
     559                 :             fetch_from_backup()     // task<Response>
     560                 :         );
     561                 :         // index is 0 or 1, result holds the winner's Response
     562                 :         auto response = std::get<Response>(result);
     563                 :     }
     564                 :     @endcode
     565                 : 
     566                 :     @par Example with Heterogeneous Types
     567                 :     @code
     568                 :     task<void> mixed_types() {
     569                 :         auto [index, result] = co_await when_any(
     570                 :             fetch_int(),      // task<int>
     571                 :             fetch_string()    // task<std::string>
     572                 :         );
     573                 :         if (index == 0)
     574                 :             std::cout << "Got int: " << std::get<int>(result) << "\n";
     575                 :         else
     576                 :             std::cout << "Got string: " << std::get<std::string>(result) << "\n";
     577                 :     }
     578                 :     @endcode
     579                 : 
     580                 :     @tparam A0 First awaitable type (must satisfy IoAwaitable).
     581                 :     @tparam As Remaining awaitable types (must satisfy IoAwaitable).
     582                 :     @param a0 The first awaitable to race.
     583                 :     @param as Additional awaitables to race concurrently.
     584                 :     @return A task yielding a pair of (winner_index, result_variant).
     585                 : 
     586                 :     @throws Rethrows the winner's exception if the winning task threw an exception.
     587                 : 
     588                 :     @par Remarks
     589                 :     Awaitables are moved into the coroutine frame; original objects become
     590                 :     empty after the call. When multiple awaitables share the same return type,
     591                 :     the variant is deduplicated to contain only unique types. Use the winner
     592                 :     index to determine which awaitable completed first. Void awaitables
     593                 :     contribute std::monostate to the variant.
     594                 : 
     595                 :     @see when_all, IoAwaitable
     596                 : */
     597                 : template<IoAwaitable A0, IoAwaitable... As>
     598              43 : [[nodiscard]] auto when_any(A0 a0, As... as)
     599                 :     -> task<detail::when_any_result_t<
     600                 :         detail::awaitable_result_t<A0>,
     601                 :         detail::awaitable_result_t<As>...>>
     602                 : {
     603                 :     using result_type = detail::when_any_result_t<
     604                 :         detail::awaitable_result_t<A0>,
     605                 :         detail::awaitable_result_t<As>...>;
     606                 : 
     607                 :     detail::when_any_state<
     608                 :         detail::awaitable_result_t<A0>,
     609                 :         detail::awaitable_result_t<As>...> state;
     610                 :     std::tuple<A0, As...> awaitable_tuple(std::move(a0), std::move(as)...);
     611                 : 
     612                 :     co_await detail::when_any_launcher<A0, As...>(&awaitable_tuple, &state);
     613                 : 
     614                 :     if(state.core_.winner_exception_)
     615                 :         std::rethrow_exception(state.core_.winner_exception_);
     616                 : 
     617                 :     co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
     618              86 : }
     619                 : 
     620                 : /** Concept for ranges of full I/O awaitables.
     621                 : 
     622                 :     A range satisfies `IoAwaitableRange` if it is a sized input range
     623                 :     whose value type satisfies @ref IoAwaitable. This enables when_any
     624                 :     to accept any container or view of awaitables, not just std::vector.
     625                 : 
     626                 :     @tparam R The range type.
     627                 : 
     628                 :     @par Requirements
     629                 :     @li `R` must satisfy `std::ranges::input_range`
     630                 :     @li `R` must satisfy `std::ranges::sized_range`
     631                 :     @li `std::ranges::range_value_t<R>` must satisfy @ref IoAwaitable
     632                 : 
     633                 :     @par Syntactic Requirements
     634                 :     Given `r` of type `R`:
     635                 :     @li `std::ranges::begin(r)` is valid
     636                 :     @li `std::ranges::end(r)` is valid
     637                 :     @li `std::ranges::size(r)` returns `std::ranges::range_size_t<R>`
     638                 :     @li `*std::ranges::begin(r)` satisfies @ref IoAwaitable
     639                 : 
     640                 :     @par Example
     641                 :     @code
     642                 :     template<IoAwaitableRange R>
     643                 :     task<void> race_all(R&& awaitables) {
     644                 :         auto winner = co_await when_any(std::forward<R>(awaitables));
     645                 :         // Process winner...
     646                 :     }
     647                 :     @endcode
     648                 : 
     649                 :     @see when_any, IoAwaitable
     650                 : */
     651                 : template<typename R>
     652                 : concept IoAwaitableRange =
     653                 :     std::ranges::input_range<R> &&
     654                 :     std::ranges::sized_range<R> &&
     655                 :     IoAwaitable<std::ranges::range_value_t<R>>;
     656                 : 
     657                 : namespace detail {
     658                 : 
     659                 : /** Shared state for homogeneous when_any (range overload).
     660                 : 
     661                 :     Uses composition with when_any_core for shared functionality.
     662                 :     Simpler than heterogeneous: optional<T> instead of variant, vector
     663                 :     instead of array for runner handles.
     664                 : */
     665                 : template<typename T>
     666                 : struct when_any_homogeneous_state
     667                 : {
     668                 :     when_any_core core_;
     669                 :     std::optional<T> result_;
     670                 :     std::vector<std::coroutine_handle<>> runner_handles_;
     671                 : 
     672              19 :     explicit when_any_homogeneous_state(std::size_t count)
     673              19 :         : core_(count)
     674              38 :         , runner_handles_(count)
     675                 :     {
     676              19 :     }
     677                 : 
     678                 :     // Runners self-destruct in final_suspend. No destruction needed here.
     679                 : 
     680                 :     /** @pre core_.try_win() returned true. */
     681              17 :     void set_winner_result(T value)
     682                 :         noexcept(std::is_nothrow_move_constructible_v<T>)
     683                 :     {
     684              17 :         result_.emplace(std::move(value));
     685              17 :     }
     686                 : };
     687                 : 
     688                 : /** Specialization for void tasks (no result storage needed). */
     689                 : template<>
     690                 : struct when_any_homogeneous_state<void>
     691                 : {
     692                 :     when_any_core core_;
     693                 :     std::vector<std::coroutine_handle<>> runner_handles_;
     694                 : 
     695               3 :     explicit when_any_homogeneous_state(std::size_t count)
     696               3 :         : core_(count)
     697               6 :         , runner_handles_(count)
     698                 :     {
     699               3 :     }
     700                 : 
     701                 :     // Runners self-destruct in final_suspend. No destruction needed here.
     702                 : 
     703                 :     // No set_winner_result - void tasks have no result to store
     704                 : };
     705                 : 
     706                 : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
     707                 : template<IoAwaitableRange Range>
     708                 : class when_any_homogeneous_launcher
     709                 : {
     710                 :     using Awaitable = std::ranges::range_value_t<Range>;
     711                 :     using T = awaitable_result_t<Awaitable>;
     712                 : 
     713                 :     Range* range_;
     714                 :     when_any_homogeneous_state<T>* state_;
     715                 : 
     716                 : public:
     717              22 :     when_any_homogeneous_launcher(
     718                 :         Range* range,
     719                 :         when_any_homogeneous_state<T>* state)
     720              22 :         : range_(range)
     721              22 :         , state_(state)
     722                 :     {
     723              22 :     }
     724                 : 
     725              22 :     bool await_ready() const noexcept
     726                 :     {
     727              22 :         return std::ranges::empty(*range_);
     728                 :     }
     729                 : 
     730                 :     /** CRITICAL: If the last task finishes synchronously, parent resumes and
     731                 :         destroys this object before await_suspend returns. Must not reference
     732                 :         `this` after dispatching begins.
     733                 : 
     734                 :         Two-phase approach:
     735                 :         1. Create all runners (safe - no dispatch yet)
     736                 :         2. Dispatch all runners (any may complete synchronously)
     737                 :     */
     738              22 :     std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
     739                 :     {
     740              22 :         state_->core_.continuation_ = continuation;
     741              22 :         state_->core_.caller_env_ = caller_env;
     742                 : 
     743              22 :         if(caller_env->stop_token.stop_possible())
     744                 :         {
     745              14 :             state_->core_.parent_stop_callback_.emplace(
     746               7 :                 caller_env->stop_token,
     747               7 :                 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
     748                 : 
     749               7 :             if(caller_env->stop_token.stop_requested())
     750               4 :                 state_->core_.stop_source_.request_stop();
     751                 :         }
     752                 : 
     753              22 :         auto token = state_->core_.stop_source_.get_token();
     754                 : 
     755                 :         // Phase 1: Create all runners without dispatching.
     756                 :         // This iterates over *range_ safely because no runners execute yet.
     757              22 :         std::size_t index = 0;
     758             107 :         for(auto&& a : *range_)
     759                 :         {
     760              85 :             auto runner = make_when_any_runner(
     761              85 :                 std::move(a), state_, index);
     762                 : 
     763              85 :             auto h = runner.release();
     764              85 :             h.promise().state_ = state_;
     765              85 :             h.promise().index_ = index;
     766              85 :             h.promise().env_ = io_env{caller_env->executor, token, caller_env->allocator};
     767                 : 
     768              85 :             state_->runner_handles_[index] = std::coroutine_handle<>{h};
     769              85 :             ++index;
     770                 :         }
     771                 : 
     772                 :         // Phase 2: Post all runners. Any may complete synchronously.
     773                 :         // After last post, state_ and this may be destroyed.
     774                 :         // Use raw pointer/count captured before posting.
     775              22 :         std::coroutine_handle<>* handles = state_->runner_handles_.data();
     776              22 :         std::size_t count = state_->runner_handles_.size();
     777             107 :         for(std::size_t i = 0; i < count; ++i)
     778              85 :             caller_env->executor.post(handles[i]);
     779                 : 
     780              44 :         return std::noop_coroutine();
     781             107 :     }
     782                 : 
     783              22 :     void await_resume() const noexcept
     784                 :     {
     785              22 :     }
     786                 : };
     787                 : 
     788                 : } // namespace detail
     789                 : 
     790                 : /** Wait for the first awaitable to complete (range overload).
     791                 : 
     792                 :     Races a range of awaitables with the same result type. Accepts any
     793                 :     sized input range of IoAwaitable types, enabling use with arrays,
     794                 :     spans, or custom containers.
     795                 : 
     796                 :     @par Suspends
     797                 :     The calling coroutine suspends when co_await is invoked. All awaitables
     798                 :     in the range are launched concurrently and execute in parallel. The
     799                 :     coroutine resumes only after all awaitables have completed, even though
     800                 :     the winner is determined by the first to finish.
     801                 : 
     802                 :     @par Completion Conditions
     803                 :     @li Winner is determined when the first awaitable completes (success or exception)
     804                 :     @li Only one task can claim winner status via atomic compare-exchange
     805                 :     @li Once a winner exists, stop is requested for all remaining siblings
     806                 :     @li Parent coroutine resumes only after all siblings acknowledge completion
     807                 :     @li The winner's index and result are returned; if the winner threw, the exception is rethrown
     808                 : 
     809                 :     @par Cancellation Semantics
     810                 :     Cancellation is supported via stop_token propagated through the
     811                 :     IoAwaitable protocol:
     812                 :     @li Each child awaitable receives a stop_token derived from a shared stop_source
     813                 :     @li When the parent's stop token is activated, the stop is forwarded to all children
     814                 :     @li When a winner is determined, stop_source_.request_stop() is called immediately
     815                 :     @li Siblings must handle cancellation gracefully and complete before parent resumes
     816                 :     @li Stop requests are cooperative; tasks must check and respond to them
     817                 : 
     818                 :     @par Concurrency/Overlap
     819                 :     All awaitables are launched concurrently before any can complete.
     820                 :     The launcher iterates through the range, starting each task on the
     821                 :     caller's executor. Tasks may execute in parallel on multi-threaded
     822                 :     executors or interleave on single-threaded executors. There is no
     823                 :     guaranteed ordering of task completion.
     824                 : 
     825                 :     @par Notable Error Conditions
     826                 :     @li Empty range: throws std::invalid_argument immediately (not via co_return)
     827                 :     @li Winner exception: if the winning task threw, that exception is rethrown
     828                 :     @li Non-winner exceptions: silently discarded (only winner's result matters)
     829                 :     @li Cancellation: tasks may complete via cancellation without throwing
     830                 : 
     831                 :     @par Example
     832                 :     @code
     833                 :     task<void> example() {
     834                 :         std::array<task<Response>, 3> requests = {
     835                 :             fetch_from_server(0),
     836                 :             fetch_from_server(1),
     837                 :             fetch_from_server(2)
     838                 :         };
     839                 : 
     840                 :         auto [index, response] = co_await when_any(std::move(requests));
     841                 :     }
     842                 :     @endcode
     843                 : 
     844                 :     @par Example with Vector
     845                 :     @code
     846                 :     task<Response> fetch_fastest(std::vector<Server> const& servers) {
     847                 :         std::vector<task<Response>> requests;
     848                 :         for (auto const& server : servers)
     849                 :             requests.push_back(fetch_from(server));
     850                 : 
     851                 :         auto [index, response] = co_await when_any(std::move(requests));
     852                 :         co_return response;
     853                 :     }
     854                 :     @endcode
     855                 : 
     856                 :     @tparam R Range type satisfying IoAwaitableRange.
     857                 :     @param awaitables Range of awaitables to race concurrently (must not be empty).
     858                 :     @return A task yielding a pair of (winner_index, result).
     859                 : 
     860                 :     @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
     861                 :     @throws Rethrows the winner's exception if the winning task threw an exception.
     862                 : 
     863                 :     @par Remarks
     864                 :     Elements are moved from the range; for lvalue ranges, the original
     865                 :     container will have moved-from elements after this call. The range
     866                 :     is moved onto the coroutine frame to ensure lifetime safety. Unlike
     867                 :     the variadic overload, no variant wrapper is needed since all tasks
     868                 :     share the same return type.
     869                 : 
     870                 :     @see when_any, IoAwaitableRange
     871                 : */
     872                 : template<IoAwaitableRange R>
     873                 :     requires (!std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>)
     874              21 : [[nodiscard]] auto when_any(R&& awaitables)
     875                 :     -> task<std::pair<std::size_t, detail::awaitable_result_t<std::ranges::range_value_t<R>>>>
     876                 : {
     877                 :     using Awaitable = std::ranges::range_value_t<R>;
     878                 :     using T = detail::awaitable_result_t<Awaitable>;
     879                 :     using result_type = std::pair<std::size_t, T>;
     880                 :     using OwnedRange = std::remove_cvref_t<R>;
     881                 : 
     882                 :     auto count = std::ranges::size(awaitables);
     883                 :     if(count == 0)
     884                 :         throw std::invalid_argument("when_any requires at least one awaitable");
     885                 : 
     886                 :     // Move/copy range onto coroutine frame to ensure lifetime
     887                 :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
     888                 : 
     889                 :     detail::when_any_homogeneous_state<T> state(count);
     890                 : 
     891                 :     co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
     892                 : 
     893                 :     if(state.core_.winner_exception_)
     894                 :         std::rethrow_exception(state.core_.winner_exception_);
     895                 : 
     896                 :     co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
     897              42 : }
     898                 : 
     899                 : /** Wait for the first awaitable to complete (void range overload).
     900                 : 
     901                 :     Races a range of void-returning awaitables. Since void awaitables have
     902                 :     no result value, only the winner's index is returned.
     903                 : 
     904                 :     @par Suspends
     905                 :     The calling coroutine suspends when co_await is invoked. All awaitables
     906                 :     in the range are launched concurrently and execute in parallel. The
     907                 :     coroutine resumes only after all awaitables have completed, even though
     908                 :     the winner is determined by the first to finish.
     909                 : 
     910                 :     @par Completion Conditions
     911                 :     @li Winner is determined when the first awaitable completes (success or exception)
     912                 :     @li Only one task can claim winner status via atomic compare-exchange
     913                 :     @li Once a winner exists, stop is requested for all remaining siblings
     914                 :     @li Parent coroutine resumes only after all siblings acknowledge completion
     915                 :     @li The winner's index is returned; if the winner threw, the exception is rethrown
     916                 : 
     917                 :     @par Cancellation Semantics
     918                 :     Cancellation is supported via stop_token propagated through the
     919                 :     IoAwaitable protocol:
     920                 :     @li Each child awaitable receives a stop_token derived from a shared stop_source
     921                 :     @li When the parent's stop token is activated, the stop is forwarded to all children
     922                 :     @li When a winner is determined, stop_source_.request_stop() is called immediately
     923                 :     @li Siblings must handle cancellation gracefully and complete before parent resumes
     924                 :     @li Stop requests are cooperative; tasks must check and respond to them
     925                 : 
     926                 :     @par Concurrency/Overlap
     927                 :     All awaitables are launched concurrently before any can complete.
     928                 :     The launcher iterates through the range, starting each task on the
     929                 :     caller's executor. Tasks may execute in parallel on multi-threaded
     930                 :     executors or interleave on single-threaded executors. There is no
     931                 :     guaranteed ordering of task completion.
     932                 : 
     933                 :     @par Notable Error Conditions
     934                 :     @li Empty range: throws std::invalid_argument immediately (not via co_return)
     935                 :     @li Winner exception: if the winning task threw, that exception is rethrown
     936                 :     @li Non-winner exceptions: silently discarded (only winner's result matters)
     937                 :     @li Cancellation: tasks may complete via cancellation without throwing
     938                 : 
     939                 :     @par Example
     940                 :     @code
     941                 :     task<void> example() {
     942                 :         std::vector<task<void>> tasks;
     943                 :         for (int i = 0; i < 5; ++i)
     944                 :             tasks.push_back(background_work(i));
     945                 : 
     946                 :         std::size_t winner = co_await when_any(std::move(tasks));
     947                 :         // winner is the index of the first task to complete
     948                 :     }
     949                 :     @endcode
     950                 : 
     951                 :     @par Example with Timeout
     952                 :     @code
     953                 :     task<void> with_timeout() {
     954                 :         std::vector<task<void>> tasks;
     955                 :         tasks.push_back(long_running_operation());
     956                 :         tasks.push_back(delay(std::chrono::seconds(5)));
     957                 : 
     958                 :         std::size_t winner = co_await when_any(std::move(tasks));
     959                 :         if (winner == 1) {
     960                 :             // Timeout occurred
     961                 :         }
     962                 :     }
     963                 :     @endcode
     964                 : 
     965                 :     @tparam R Range type satisfying IoAwaitableRange with void result.
     966                 :     @param awaitables Range of void awaitables to race concurrently (must not be empty).
     967                 :     @return A task yielding the winner's index (zero-based).
     968                 : 
     969                 :     @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
     970                 :     @throws Rethrows the winner's exception if the winning task threw an exception.
     971                 : 
     972                 :     @par Remarks
     973                 :     Elements are moved from the range; for lvalue ranges, the original
     974                 :     container will have moved-from elements after this call. The range
     975                 :     is moved onto the coroutine frame to ensure lifetime safety. Unlike
     976                 :     the non-void overload, no result storage is needed since void tasks
     977                 :     produce no value.
     978                 : 
     979                 :     @see when_any, IoAwaitableRange
     980                 : */
     981                 : template<IoAwaitableRange R>
     982                 :     requires std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>
     983               3 : [[nodiscard]] auto when_any(R&& awaitables) -> task<std::size_t>
     984                 : {
     985                 :     using OwnedRange = std::remove_cvref_t<R>;
     986                 : 
     987                 :     auto count = std::ranges::size(awaitables);
     988                 :     if(count == 0)
     989                 :         throw std::invalid_argument("when_any requires at least one awaitable");
     990                 : 
     991                 :     // Move/copy range onto coroutine frame to ensure lifetime
     992                 :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
     993                 : 
     994                 :     detail::when_any_homogeneous_state<void> state(count);
     995                 : 
     996                 :     co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
     997                 : 
     998                 :     if(state.core_.winner_exception_)
     999                 :         std::rethrow_exception(state.core_.winner_exception_);
    1000                 : 
    1001                 :     co_return state.core_.winner_index_;
    1002               6 : }
    1003                 : 
    1004                 : } // namespace capy
    1005                 : } // namespace boost
    1006                 : 
    1007                 : #endif
        

Generated by: LCOV version 2.3