LCOV - code coverage report
Current view: top level - capy/io - any_write_stream.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 94.1 % 101 95 6
Test Date: 2026-02-17 18:14:47 Functions: 91.5 % 59 54 5

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/capy
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
      11                 : #define BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
      12                 : 
      13                 : #include <boost/capy/detail/config.hpp>
      14                 : #include <boost/capy/detail/await_suspend_helper.hpp>
      15                 : #include <boost/capy/buffers.hpp>
      16                 : #include <boost/capy/buffers/buffer_array.hpp>
      17                 : #include <boost/capy/concept/io_awaitable.hpp>
      18                 : #include <boost/capy/concept/write_stream.hpp>
      19                 : #include <coroutine>
      20                 : #include <boost/capy/ex/io_env.hpp>
      21                 : #include <boost/capy/io_result.hpp>
      22                 : 
      23                 : #include <concepts>
      24                 : #include <coroutine>
      25                 : #include <cstddef>
      26                 : #include <new>
      27                 : #include <span>
      28                 : #include <stop_token>
      29                 : #include <system_error>
      30                 : #include <utility>
      31                 : 
      32                 : namespace boost {
      33                 : namespace capy {
      34                 : 
      35                 : /** Type-erased wrapper for any WriteStream.
      36                 : 
      37                 :     This class provides type erasure for any type satisfying the
      38                 :     @ref WriteStream concept, enabling runtime polymorphism for
      39                 :     write operations. It uses cached awaitable storage to achieve
      40                 :     zero steady-state allocation after construction.
      41                 : 
      42                 :     The wrapper supports two construction modes:
      43                 :     - **Owning**: Pass by value to transfer ownership. The wrapper
      44                 :       allocates storage and owns the stream.
      45                 :     - **Reference**: Pass a pointer to wrap without ownership. The
      46                 :       pointed-to stream must outlive this wrapper.
      47                 : 
      48                 :     @par Awaitable Preallocation
      49                 :     The constructor preallocates storage for the type-erased awaitable.
      50                 :     This reserves all virtual address space at server startup
      51                 :     so memory usage can be measured up front, rather than
      52                 :     allocating piecemeal as traffic arrives.
      53                 : 
      54                 :     @par Immediate Completion
      55                 :     Operations complete immediately without suspending when the
      56                 :     buffer sequence is empty, or when the underlying stream's
      57                 :     awaitable reports readiness via `await_ready`.
      58                 : 
      59                 :     @par Thread Safety
      60                 :     Not thread-safe. Concurrent operations on the same wrapper
      61                 :     are undefined behavior.
      62                 : 
      63                 :     @par Example
      64                 :     @code
      65                 :     // Owning - takes ownership of the stream
      66                 :     any_write_stream stream(socket{ioc});
      67                 : 
      68                 :     // Reference - wraps without ownership
      69                 :     socket sock(ioc);
      70                 :     any_write_stream stream(&sock);
      71                 : 
      72                 :     const_buffer buf(data, size);
      73                 :     auto [ec, n] = co_await stream.write_some(std::span(&buf, 1));
      74                 :     @endcode
      75                 : 
      76                 :     @see any_read_stream, any_stream, WriteStream
      77                 : */
      78                 : class any_write_stream
      79                 : {
      80                 :     struct vtable;
      81                 : 
      82                 :     template<WriteStream S>
      83                 :     struct vtable_for_impl;
      84                 : 
      85                 :     // ordered for cache line coherence
      86                 :     void* stream_ = nullptr;
      87                 :     vtable const* vt_ = nullptr;
      88                 :     void* cached_awaitable_ = nullptr;
      89                 :     void* storage_ = nullptr;
      90                 :     bool awaitable_active_ = false;
      91                 : 
      92                 : public:
      93                 :     /** Destructor.
      94                 : 
      95                 :         Destroys the owned stream (if any) and releases the cached
      96                 :         awaitable storage.
      97                 :     */
      98                 :     ~any_write_stream();
      99                 : 
     100                 :     /** Default constructor.
     101                 : 
     102                 :         Constructs an empty wrapper. Operations on a default-constructed
     103                 :         wrapper result in undefined behavior.
     104                 :     */
     105 HIT           1 :     any_write_stream() = default;
     106                 : 
     107                 :     /** Non-copyable.
     108                 : 
     109                 :         The awaitable cache is per-instance and cannot be shared.
     110                 :     */
     111                 :     any_write_stream(any_write_stream const&) = delete;
     112                 :     any_write_stream& operator=(any_write_stream const&) = delete;
     113                 : 
     114                 :     /** Move constructor.
     115                 : 
     116                 :         Transfers ownership of the wrapped stream (if owned) and
     117                 :         cached awaitable storage from `other`. After the move, `other` is
     118                 :         in a default-constructed state.
     119                 : 
     120                 :         @param other The wrapper to move from.
     121                 :     */
     122               2 :     any_write_stream(any_write_stream&& other) noexcept
     123               2 :         : stream_(std::exchange(other.stream_, nullptr))
     124               2 :         , vt_(std::exchange(other.vt_, nullptr))
     125               2 :         , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
     126               2 :         , storage_(std::exchange(other.storage_, nullptr))
     127               2 :         , awaitable_active_(std::exchange(other.awaitable_active_, false))
     128                 :     {
     129               2 :     }
     130                 : 
     131                 :     /** Move assignment operator.
     132                 : 
     133                 :         Destroys any owned stream and releases existing resources,
     134                 :         then transfers ownership from `other`.
     135                 : 
     136                 :         @param other The wrapper to move from.
     137                 :         @return Reference to this wrapper.
     138                 :     */
     139                 :     any_write_stream&
     140                 :     operator=(any_write_stream&& other) noexcept;
     141                 : 
     142                 :     /** Construct by taking ownership of a WriteStream.
     143                 : 
     144                 :         Allocates storage and moves the stream into this wrapper.
     145                 :         The wrapper owns the stream and will destroy it.
     146                 : 
     147                 :         @param s The stream to take ownership of.
     148                 :     */
     149                 :     template<WriteStream S>
     150                 :         requires (!std::same_as<std::decay_t<S>, any_write_stream>)
     151                 :     any_write_stream(S s);
     152                 : 
     153                 :     /** Construct by wrapping a WriteStream without ownership.
     154                 : 
     155                 :         Wraps the given stream by pointer. The stream must remain
     156                 :         valid for the lifetime of this wrapper.
     157                 : 
     158                 :         @param s Pointer to the stream to wrap.
     159                 :     */
     160                 :     template<WriteStream S>
     161                 :     any_write_stream(S* s);
     162                 : 
     163                 :     /** Check if the wrapper contains a valid stream.
     164                 : 
     165                 :         @return `true` if wrapping a stream, `false` if default-constructed
     166                 :             or moved-from.
     167                 :     */
     168                 :     bool
     169              21 :     has_value() const noexcept
     170                 :     {
     171              21 :         return stream_ != nullptr;
     172                 :     }
     173                 : 
     174                 :     /** Check if the wrapper contains a valid stream.
     175                 : 
     176                 :         @return `true` if wrapping a stream, `false` if default-constructed
     177                 :             or moved-from.
     178                 :     */
     179                 :     explicit
     180               3 :     operator bool() const noexcept
     181                 :     {
     182               3 :         return has_value();
     183                 :     }
     184                 : 
     185                 :     /** Initiate an asynchronous write operation.
     186                 : 
     187                 :         Writes data from the provided buffer sequence. The operation
     188                 :         completes when at least one byte has been written, or an error
     189                 :         occurs.
     190                 : 
     191                 :         @param buffers The buffer sequence containing data to write.
     192                 :             Passed by value to ensure the sequence lives in the
     193                 :             coroutine frame across suspension points.
     194                 : 
     195                 :         @return An awaitable yielding `(error_code,std::size_t)`.
     196                 : 
     197                 :         @par Immediate Completion
     198                 :         The operation completes immediately without suspending
     199                 :         the calling coroutine when:
     200                 :         @li The buffer sequence is empty, returning `{error_code{}, 0}`.
     201                 :         @li The underlying stream's awaitable reports immediate
     202                 :             readiness via `await_ready`.
     203                 : 
     204                 :         @note This is a partial operation and may not process the
     205                 :         entire buffer sequence. Use the composed @ref write algorithm
     206                 :         for guaranteed complete transfer.
     207                 : 
     208                 :         @par Preconditions
     209                 :         The wrapper must contain a valid stream (`has_value() == true`).
     210                 :     */
     211                 :     template<ConstBufferSequence CB>
     212                 :     auto
     213                 :     write_some(CB buffers);
     214                 : 
     215                 : protected:
     216                 :     /** Rebind to a new stream after move.
     217                 : 
     218                 :         Updates the internal pointer to reference a new stream object.
     219                 :         Used by owning wrappers after move assignment when the owned
     220                 :         object has moved to a new location.
     221                 : 
     222                 :         @param new_stream The new stream to bind to. Must be the same
     223                 :             type as the original stream.
     224                 : 
     225                 :         @note Terminates if called with a stream of different type
     226                 :             than the original.
     227                 :     */
     228                 :     template<WriteStream S>
     229                 :     void
     230                 :     rebind(S& new_stream) noexcept
     231                 :     {
     232                 :         if(vt_ != &vtable_for_impl<S>::value)
     233                 :             std::terminate();
     234                 :         stream_ = &new_stream;
     235                 :     }
     236                 : };
     237                 : 
     238                 : //----------------------------------------------------------
     239                 : 
     240                 : struct any_write_stream::vtable
     241                 : {
     242                 :     // ordered by call frequency for cache line coherence
     243                 :     void (*construct_awaitable)(
     244                 :         void* stream,
     245                 :         void* storage,
     246                 :         std::span<const_buffer const> buffers);
     247                 :     bool (*await_ready)(void*);
     248                 :     std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
     249                 :     io_result<std::size_t> (*await_resume)(void*);
     250                 :     void (*destroy_awaitable)(void*) noexcept;
     251                 :     std::size_t awaitable_size;
     252                 :     std::size_t awaitable_align;
     253                 :     void (*destroy)(void*) noexcept;
     254                 : };
     255                 : 
     256                 : template<WriteStream S>
     257                 : struct any_write_stream::vtable_for_impl
     258                 : {
     259                 :     using Awaitable = decltype(std::declval<S&>().write_some(
     260                 :         std::span<const_buffer const>{}));
     261                 : 
     262                 :     static void
     263               1 :     do_destroy_impl(void* stream) noexcept
     264                 :     {
     265               1 :         static_cast<S*>(stream)->~S();
     266               1 :     }
     267                 : 
     268                 :     static void
     269              75 :     construct_awaitable_impl(
     270                 :         void* stream,
     271                 :         void* storage,
     272                 :         std::span<const_buffer const> buffers)
     273                 :     {
     274              75 :         auto& s = *static_cast<S*>(stream);
     275              75 :         ::new(storage) Awaitable(s.write_some(buffers));
     276              75 :     }
     277                 : 
     278                 :     static constexpr vtable value = {
     279                 :         &construct_awaitable_impl,
     280              75 :         +[](void* p) {
     281              75 :             return static_cast<Awaitable*>(p)->await_ready();
     282                 :         },
     283               2 :         +[](void* p, std::coroutine_handle<> h, io_env const* env) {
     284               2 :             return detail::call_await_suspend(
     285               2 :                 static_cast<Awaitable*>(p), h, env);
     286                 :         },
     287              73 :         +[](void* p) {
     288              73 :             return static_cast<Awaitable*>(p)->await_resume();
     289                 :         },
     290              77 :         +[](void* p) noexcept {
     291              12 :             static_cast<Awaitable*>(p)->~Awaitable();
     292                 :         },
     293                 :         sizeof(Awaitable),
     294                 :         alignof(Awaitable),
     295                 :         &do_destroy_impl
     296                 :     };
     297                 : };
     298                 : 
     299                 : //----------------------------------------------------------
     300                 : 
     301                 : inline
     302              95 : any_write_stream::~any_write_stream()
     303                 : {
     304              95 :     if(storage_)
     305                 :     {
     306               1 :         vt_->destroy(stream_);
     307               1 :         ::operator delete(storage_);
     308                 :     }
     309              95 :     if(cached_awaitable_)
     310                 :     {
     311              85 :         if(awaitable_active_)
     312               1 :             vt_->destroy_awaitable(cached_awaitable_);
     313              85 :         ::operator delete(cached_awaitable_);
     314                 :     }
     315              95 : }
     316                 : 
     317                 : inline any_write_stream&
     318               5 : any_write_stream::operator=(any_write_stream&& other) noexcept
     319                 : {
     320               5 :     if(this != &other)
     321                 :     {
     322               5 :         if(storage_)
     323                 :         {
     324 MIS           0 :             vt_->destroy(stream_);
     325               0 :             ::operator delete(storage_);
     326                 :         }
     327 HIT           5 :         if(cached_awaitable_)
     328                 :         {
     329               2 :             if(awaitable_active_)
     330               1 :                 vt_->destroy_awaitable(cached_awaitable_);
     331               2 :             ::operator delete(cached_awaitable_);
     332                 :         }
     333               5 :         stream_ = std::exchange(other.stream_, nullptr);
     334               5 :         vt_ = std::exchange(other.vt_, nullptr);
     335               5 :         cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
     336               5 :         storage_ = std::exchange(other.storage_, nullptr);
     337               5 :         awaitable_active_ = std::exchange(other.awaitable_active_, false);
     338                 :     }
     339               5 :     return *this;
     340                 : }
     341                 : 
     342                 : template<WriteStream S>
     343                 :     requires (!std::same_as<std::decay_t<S>, any_write_stream>)
     344               1 : any_write_stream::any_write_stream(S s)
     345               1 :     : vt_(&vtable_for_impl<S>::value)
     346                 : {
     347                 :     struct guard {
     348                 :         any_write_stream* self;
     349                 :         bool committed = false;
     350               1 :         ~guard() {
     351               1 :             if(!committed && self->storage_) {
     352 MIS           0 :                 self->vt_->destroy(self->stream_);
     353               0 :                 ::operator delete(self->storage_);
     354               0 :                 self->storage_ = nullptr;
     355               0 :                 self->stream_ = nullptr;
     356                 :             }
     357 HIT           1 :         }
     358               1 :     } g{this};
     359                 : 
     360               1 :     storage_ = ::operator new(sizeof(S));
     361               1 :     stream_ = ::new(storage_) S(std::move(s));
     362                 : 
     363                 :     // Preallocate the awaitable storage
     364               1 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     365                 : 
     366               1 :     g.committed = true;
     367               1 : }
     368                 : 
     369                 : template<WriteStream S>
     370              86 : any_write_stream::any_write_stream(S* s)
     371              86 :     : stream_(s)
     372              86 :     , vt_(&vtable_for_impl<S>::value)
     373                 : {
     374                 :     // Preallocate the awaitable storage
     375              86 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     376              86 : }
     377                 : 
     378                 : //----------------------------------------------------------
     379                 : 
     380                 : template<ConstBufferSequence CB>
     381                 : auto
     382              79 : any_write_stream::write_some(CB buffers)
     383                 : {
     384                 :     struct awaitable
     385                 :     {
     386                 :         any_write_stream* self_;
     387                 :         const_buffer_array<detail::max_iovec_> ba_;
     388                 : 
     389              79 :         awaitable(
     390                 :             any_write_stream* self,
     391                 :             CB const& buffers) noexcept
     392              79 :             : self_(self)
     393              79 :             , ba_(buffers)
     394                 :         {
     395              79 :         }
     396                 : 
     397                 :         bool
     398              79 :         await_ready() const noexcept
     399                 :         {
     400              79 :             return ba_.to_span().empty();
     401                 :         }
     402                 : 
     403                 :         std::coroutine_handle<>
     404              75 :         await_suspend(std::coroutine_handle<> h, io_env const* env)
     405                 :         {
     406              75 :             self_->vt_->construct_awaitable(
     407              75 :                 self_->stream_,
     408              75 :                 self_->cached_awaitable_,
     409              75 :                 ba_.to_span());
     410              75 :             self_->awaitable_active_ = true;
     411                 : 
     412              75 :             if(self_->vt_->await_ready(self_->cached_awaitable_))
     413              73 :                 return h;
     414                 : 
     415               2 :             return self_->vt_->await_suspend(
     416               2 :                 self_->cached_awaitable_, h, env);
     417                 :         }
     418                 : 
     419                 :         io_result<std::size_t>
     420              77 :         await_resume()
     421                 :         {
     422              77 :             if(!self_->awaitable_active_)
     423               4 :                 return {{}, 0};
     424                 :             struct guard {
     425                 :                 any_write_stream* self;
     426              73 :                 ~guard() {
     427              73 :                     self->vt_->destroy_awaitable(self->cached_awaitable_);
     428              73 :                     self->awaitable_active_ = false;
     429              73 :                 }
     430              73 :             } g{self_};
     431              73 :             return self_->vt_->await_resume(
     432              73 :                 self_->cached_awaitable_);
     433              73 :         }
     434                 :     };
     435              79 :     return awaitable{this, buffers};
     436                 : }
     437                 : 
     438                 : } // namespace capy
     439                 : } // namespace boost
     440                 : 
     441                 : #endif
        

Generated by: LCOV version 2.3