LCOV - code coverage report
Current view: top level - capy/io - write_now.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 90.4 % 73 66 7
Test Date: 2026-02-17 18:14:47 Functions: 88.6 % 35 31 4

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/capy
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_CAPY_IO_WRITE_NOW_HPP
      11                 : #define BOOST_CAPY_IO_WRITE_NOW_HPP
      12                 : 
      13                 : #include <boost/capy/detail/config.hpp>
      14                 : #include <boost/capy/detail/await_suspend_helper.hpp>
      15                 : #include <boost/capy/buffers.hpp>
      16                 : #include <boost/capy/buffers/consuming_buffers.hpp>
      17                 : #include <boost/capy/concept/io_awaitable.hpp>
      18                 : #include <boost/capy/concept/write_stream.hpp>
      19                 : #include <coroutine>
      20                 : #include <boost/capy/ex/executor_ref.hpp>
      21                 : #include <boost/capy/ex/io_env.hpp>
      22                 : #include <boost/capy/io_result.hpp>
      23                 : 
      24                 : #include <cstddef>
      25                 : #include <exception>
      26                 : #include <new>
      27                 : #include <stop_token>
      28                 : #include <utility>
      29                 : 
      30                 : #ifndef BOOST_CAPY_WRITE_NOW_WORKAROUND
      31                 : # if defined(__GNUC__) && !defined(__clang__)
      32                 : #  define BOOST_CAPY_WRITE_NOW_WORKAROUND 1
      33                 : # else
      34                 : #  define BOOST_CAPY_WRITE_NOW_WORKAROUND 0
      35                 : # endif
      36                 : #endif
      37                 : 
      38                 : namespace boost {
      39                 : namespace capy {
      40                 : 
      41                 : /** Eagerly writes complete buffer sequences with frame caching.
      42                 : 
      43                 :     This class wraps a @ref WriteStream and provides an `operator()`
      44                 :     that writes an entire buffer sequence, attempting to complete
      45                 :     synchronously. If every `write_some` completes without suspending,
      46                 :     the entire operation finishes in `await_ready` with no coroutine
      47                 :     suspension.
      48                 : 
      49                 :     The class maintains a one-element coroutine frame cache. After
      50                 :     the first call, subsequent calls reuse the cached frame memory,
      51                 :     avoiding repeated allocation for the internal coroutine.
      52                 : 
      53                 :     @tparam Stream The stream type, must satisfy @ref WriteStream.
      54                 : 
      55                 :     @par Thread Safety
      56                 :     Distinct objects: Safe.
      57                 :     Shared objects: Unsafe.
      58                 : 
      59                 :     @par Preconditions
      60                 :     Only one operation may be outstanding at a time. A new call to
      61                 :     `operator()` must not be made until the previous operation has
      62                 :     completed (i.e., the returned awaitable has been fully consumed).
      63                 : 
      64                 :     @par Example
      65                 : 
      66                 :     @code
      67                 :     template< WriteStream Stream >
      68                 :     task<> send_messages( Stream& stream )
      69                 :     {
      70                 :         write_now wn( stream );
      71                 :         auto [ec1, n1] = co_await wn( make_buffer( "hello" ) );
      72                 :         if( ec1 )
      73                 :             detail::throw_system_error( ec1 );
      74                 :         auto [ec2, n2] = co_await wn( make_buffer( "world" ) );
      75                 :         if( ec2 )
      76                 :             detail::throw_system_error( ec2 );
      77                 :     }
      78                 :     @endcode
      79                 : 
      80                 :     @see write, write_some, WriteStream, ConstBufferSequence
      81                 : */
      82                 : template<class Stream>
      83                 :     requires WriteStream<Stream>
      84                 : class write_now
      85                 : {
      86                 :     Stream& stream_;
      87                 :     void* cached_frame_ = nullptr;
      88                 :     std::size_t cached_size_ = 0;
      89                 : 
      90                 :     struct [[nodiscard]] BOOST_CAPY_CORO_AWAIT_ELIDABLE
      91                 :         op_type
      92                 :     {
      93                 :         struct promise_type
      94                 :         {
      95                 :             io_result<std::size_t> result_;
      96                 :             std::exception_ptr ep_;
      97                 :             std::coroutine_handle<> cont_{nullptr};
      98                 :             io_env const* env_ = nullptr;
      99                 :             bool done_ = false;
     100                 : 
     101 HIT          68 :             op_type get_return_object()
     102                 :             {
     103                 :                 return op_type{
     104                 :                     std::coroutine_handle<
     105              68 :                         promise_type>::from_promise(*this)};
     106                 :             }
     107                 : 
     108              68 :             auto initial_suspend() noexcept
     109                 :             {
     110                 : #if BOOST_CAPY_WRITE_NOW_WORKAROUND
     111              68 :                 return std::suspend_always{};
     112                 : #else
     113                 :                 return std::suspend_never{};
     114                 : #endif
     115                 :             }
     116                 : 
     117              68 :             auto final_suspend() noexcept
     118                 :             {
     119                 :                 struct awaiter
     120                 :                 {
     121                 :                     promise_type* p_;
     122                 : 
     123              68 :                     bool await_ready() const noexcept
     124                 :                     {
     125              68 :                         return false;
     126                 :                     }
     127                 : 
     128              68 :                     std::coroutine_handle<> await_suspend(std::coroutine_handle<>) const noexcept
     129                 :                     {
     130              68 :                         p_->done_ = true;
     131              68 :                         if(!p_->cont_)
     132 MIS           0 :                             return std::noop_coroutine();
     133 HIT          68 :                         return p_->cont_;
     134                 :                     }
     135                 : 
     136 MIS           0 :                     void await_resume() const noexcept
     137                 :                     {
     138               0 :                     }
     139                 :                 };
     140 HIT          68 :                 return awaiter{this};
     141                 :             }
     142                 : 
     143              46 :             void return_value(
     144                 :                 io_result<std::size_t> r) noexcept
     145                 :             {
     146              46 :                 result_ = r;
     147              46 :             }
     148                 : 
     149              22 :             void unhandled_exception()
     150                 :             {
     151              22 :                 ep_ = std::current_exception();
     152              22 :             }
     153                 : 
     154                 :             std::suspend_always yield_value(int) noexcept
     155                 :             {
     156                 :                 return {};
     157                 :             }
     158                 : 
     159                 :             template<class A>
     160              84 :             auto await_transform(A&& a)
     161                 :             {
     162                 :                 using decayed = std::decay_t<A>;
     163                 :                 if constexpr (IoAwaitable<decayed>)
     164                 :                 {
     165                 :                     struct wrapper
     166                 :                     {
     167                 :                         decayed inner_;
     168                 :                         promise_type* p_;
     169                 : 
     170              84 :                         bool await_ready()
     171                 :                         {
     172              84 :                             return inner_.await_ready();
     173                 :                         }
     174                 : 
     175 MIS           0 :                         std::coroutine_handle<> await_suspend(std::coroutine_handle<> h)
     176                 :                         {
     177               0 :                             return detail::call_await_suspend(
     178                 :                                 &inner_, h,
     179               0 :                                 p_->env_);
     180                 :                         }
     181                 : 
     182 HIT          84 :                         decltype(auto) await_resume()
     183                 :                         {
     184              84 :                             return inner_.await_resume();
     185                 :                         }
     186                 :                     };
     187                 :                     return wrapper{
     188              84 :                         std::forward<A>(a), this};
     189                 :                 }
     190                 :                 else
     191                 :                 {
     192                 :                     return std::forward<A>(a);
     193                 :                 }
     194                 :             }
     195                 : 
     196                 :             static void*
     197              68 :             operator new(
     198                 :                 std::size_t size,
     199                 :                 write_now& self,
     200                 :                 auto&)
     201                 :             {
     202              68 :                 if(self.cached_frame_ &&
     203               4 :                     self.cached_size_ >= size)
     204               4 :                     return self.cached_frame_;
     205              64 :                 void* p = ::operator new(size);
     206              64 :                 if(self.cached_frame_)
     207 MIS           0 :                     ::operator delete(self.cached_frame_);
     208 HIT          64 :                 self.cached_frame_ = p;
     209              64 :                 self.cached_size_ = size;
     210              64 :                 return p;
     211                 :             }
     212                 : 
     213                 :             static void
     214              68 :             operator delete(void*, std::size_t) noexcept
     215                 :             {
     216              68 :             }
     217                 :         };
     218                 : 
     219                 :         std::coroutine_handle<promise_type> h_;
     220                 : 
     221             136 :         ~op_type()
     222                 :         {
     223             136 :             if(h_)
     224              68 :                 h_.destroy();
     225             136 :         }
     226                 : 
     227                 :         op_type(op_type const&) = delete;
     228                 :         op_type& operator=(op_type const&) = delete;
     229                 : 
     230              68 :         op_type(op_type&& other) noexcept
     231              68 :             : h_(std::exchange(other.h_, nullptr))
     232                 :         {
     233              68 :         }
     234                 : 
     235                 :         op_type& operator=(op_type&&) = delete;
     236                 : 
     237              68 :         bool await_ready() const noexcept
     238                 :         {
     239              68 :             return h_.promise().done_;
     240                 :         }
     241                 : 
     242              68 :         std::coroutine_handle<> await_suspend(
     243                 :             std::coroutine_handle<> cont,
     244                 :             io_env const* env)
     245                 :         {
     246              68 :             auto& p = h_.promise();
     247              68 :             p.cont_ = cont;
     248              68 :             p.env_ = env;
     249              68 :             return h_;
     250                 :         }
     251                 : 
     252              68 :         io_result<std::size_t> await_resume()
     253                 :         {
     254              68 :             auto& p = h_.promise();
     255              68 :             if(p.ep_)
     256              22 :                 std::rethrow_exception(p.ep_);
     257              46 :             return p.result_;
     258                 :         }
     259                 : 
     260                 :     private:
     261              68 :         explicit op_type(
     262                 :             std::coroutine_handle<promise_type> h)
     263              68 :             : h_(h)
     264                 :         {
     265              68 :         }
     266                 :     };
     267                 : 
     268                 : public:
     269                 :     /** Destructor. Frees the cached coroutine frame. */
     270              64 :     ~write_now()
     271                 :     {
     272              64 :         if(cached_frame_)
     273              64 :             ::operator delete(cached_frame_);
     274              64 :     }
     275                 : 
     276                 :     /** Construct from a stream reference.
     277                 : 
     278                 :         @param s The stream to write to. Must outlive this object.
     279                 :     */
     280                 :     explicit
     281              64 :     write_now(Stream& s) noexcept
     282              64 :         : stream_(s)
     283                 :     {
     284              64 :     }
     285                 : 
     286                 :     write_now(write_now const&) = delete;
     287                 :     write_now& operator=(write_now const&) = delete;
     288                 : 
     289                 :     /** Eagerly write the entire buffer sequence.
     290                 : 
     291                 :         Writes data to the stream by calling `write_some` repeatedly
     292                 :         until the entire buffer sequence is written or an error
     293                 :         occurs. The operation attempts to complete synchronously:
     294                 :         if every `write_some` completes without suspending, the
     295                 :         entire operation finishes in `await_ready`.
     296                 : 
     297                 :         When the fast path cannot complete, the coroutine suspends
     298                 :         and continues asynchronously. The internal coroutine frame
     299                 :         is cached and reused across calls.
     300                 : 
     301                 :         @param buffers The buffer sequence to write. Passed by
     302                 :             value to ensure the sequence lives in the coroutine
     303                 :             frame across suspension points.
     304                 : 
     305                 :         @return An awaitable yielding `(error_code,std::size_t)`.
     306                 :             On success, `n` equals `buffer_size(buffers)`. On
     307                 :             error, `n` is the number of bytes written before the
     308                 :             error. Compare error codes to conditions:
     309                 :             @li `cond::canceled` - Operation was cancelled
     310                 :             @li `std::errc::broken_pipe` - Peer closed connection
     311                 : 
     312                 :         @par Example
     313                 : 
     314                 :         @code
     315                 :         write_now wn( stream );
     316                 :         auto [ec, n] = co_await wn( make_buffer( body ) );
     317                 :         if( ec )
     318                 :             detail::throw_system_error( ec );
     319                 :         @endcode
     320                 : 
     321                 :         @see write, write_some, WriteStream
     322                 :     */
     323                 : // GCC falsely warns that the coroutine promise's
     324                 : // placement operator new(size_t, write_now&, auto&)
     325                 : // mismatches operator delete(void*, size_t). Per the
     326                 : // standard, coroutine deallocation lookup is separate.
     327                 : #if defined(__GNUC__) && !defined(__clang__)
     328                 : #pragma GCC diagnostic push
     329                 : #pragma GCC diagnostic ignored "-Wmismatched-new-delete"
     330                 : #endif
     331                 : 
     332                 : #if BOOST_CAPY_WRITE_NOW_WORKAROUND
     333                 :     template<ConstBufferSequence Buffers>
     334                 :     op_type
     335              68 :     operator()(Buffers buffers)
     336                 :     {
     337                 :         std::size_t const total_size = buffer_size(buffers);
     338                 :         std::size_t total_written = 0;
     339                 :         consuming_buffers cb(buffers);
     340                 :         while(total_written < total_size)
     341                 :         {
     342                 :             auto r =
     343                 :                 co_await stream_.write_some(cb);
     344                 :             if(r.ec)
     345                 :                 co_return io_result<std::size_t>{
     346                 :                     r.ec, total_written};
     347                 :             cb.consume(r.t1);
     348                 :             total_written += r.t1;
     349                 :         }
     350                 :         co_return io_result<std::size_t>{
     351                 :             {}, total_written};
     352             136 :     }
     353                 : #else
     354                 :     template<ConstBufferSequence Buffers>
     355                 :     op_type
     356                 :     operator()(Buffers buffers)
     357                 :     {
     358                 :         std::size_t const total_size = buffer_size(buffers);
     359                 :         std::size_t total_written = 0;
     360                 : 
     361                 :         // GCC ICE in expand_expr_real_1 (expr.cc:11376)
     362                 :         // when consuming_buffers spans a co_yield, so
     363                 :         // the GCC path uses a separate simple coroutine.
     364                 :         consuming_buffers cb(buffers);
     365                 :         while(total_written < total_size)
     366                 :         {
     367                 :             auto inner = stream_.write_some(cb);
     368                 :             if(!inner.await_ready())
     369                 :                 break;
     370                 :             auto r = inner.await_resume();
     371                 :             if(r.ec)
     372                 :                 co_return io_result<std::size_t>{
     373                 :                     r.ec, total_written};
     374                 :             cb.consume(r.t1);
     375                 :             total_written += r.t1;
     376                 :         }
     377                 : 
     378                 :         if(total_written >= total_size)
     379                 :             co_return io_result<std::size_t>{
     380                 :                 {}, total_written};
     381                 : 
     382                 :         co_yield 0;
     383                 : 
     384                 :         while(total_written < total_size)
     385                 :         {
     386                 :             auto r =
     387                 :                 co_await stream_.write_some(cb);
     388                 :             if(r.ec)
     389                 :                 co_return io_result<std::size_t>{
     390                 :                     r.ec, total_written};
     391                 :             cb.consume(r.t1);
     392                 :             total_written += r.t1;
     393                 :         }
     394                 :         co_return io_result<std::size_t>{
     395                 :             {}, total_written};
     396                 :     }
     397                 : #endif
     398                 : 
     399                 : #if defined(__GNUC__) && !defined(__clang__)
     400                 : #pragma GCC diagnostic pop
     401                 : #endif
     402                 : };
     403                 : 
     404                 : template<WriteStream S>
     405                 : write_now(S&) -> write_now<S>;
     406                 : 
     407                 : } // namespace capy
     408                 : } // namespace boost
     409                 : 
     410                 : #endif
        

Generated by: LCOV version 2.3