LCOV - code coverage report
Current view: top level - capy/io - any_read_stream.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 87.4 % 95 83 12
Test Date: 2026-02-17 18:14:47 Functions: 79.6 % 54 43 11

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/capy
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_CAPY_IO_ANY_READ_STREAM_HPP
      11                 : #define BOOST_CAPY_IO_ANY_READ_STREAM_HPP
      12                 : 
      13                 : #include <boost/capy/detail/config.hpp>
      14                 : #include <boost/capy/detail/await_suspend_helper.hpp>
      15                 : #include <boost/capy/buffers.hpp>
      16                 : #include <boost/capy/buffers/buffer_array.hpp>
      17                 : #include <boost/capy/concept/io_awaitable.hpp>
      18                 : #include <boost/capy/concept/read_stream.hpp>
      19                 : #include <boost/capy/ex/io_env.hpp>
      20                 : #include <boost/capy/io_result.hpp>
      21                 : 
      22                 : #include <concepts>
      23                 : #include <coroutine>
      24                 : #include <cstddef>
      25                 : #include <new>
      26                 : #include <span>
      27                 : #include <stop_token>
      28                 : #include <system_error>
      29                 : #include <utility>
      30                 : 
      31                 : namespace boost {
      32                 : namespace capy {
      33                 : 
      34                 : /** Type-erased wrapper for any ReadStream.
      35                 : 
      36                 :     This class provides type erasure for any type satisfying the
      37                 :     @ref ReadStream concept, enabling runtime polymorphism for
      38                 :     read operations. It uses cached awaitable storage to achieve
      39                 :     zero steady-state allocation after construction.
      40                 : 
      41                 :     The wrapper supports two construction modes:
      42                 :     - **Owning**: Pass by value to transfer ownership. The wrapper
      43                 :       allocates storage and owns the stream.
      44                 :     - **Reference**: Pass a pointer to wrap without ownership. The
      45                 :       pointed-to stream must outlive this wrapper.
      46                 : 
      47                 :     @par Awaitable Preallocation
      48                 :     The constructor preallocates storage for the type-erased awaitable.
      49                 :     This reserves all virtual address space at server startup
      50                 :     so memory usage can be measured up front, rather than
      51                 :     allocating piecemeal as traffic arrives.
      52                 : 
      53                 :     @par Immediate Completion
      54                 :     When the underlying stream's awaitable reports ready immediately
      55                 :     (e.g. buffered data already available), the wrapper skips
      56                 :     coroutine suspension entirely and returns the result inline.
      57                 : 
      58                 :     @par Thread Safety
      59                 :     Not thread-safe. Concurrent operations on the same wrapper
      60                 :     are undefined behavior.
      61                 : 
      62                 :     @par Example
      63                 :     @code
      64                 :     // Owning - takes ownership of the stream
      65                 :     any_read_stream stream(socket{ioc});
      66                 : 
      67                 :     // Reference - wraps without ownership
      68                 :     socket sock(ioc);
      69                 :     any_read_stream stream(&sock);
      70                 : 
      71                 :     mutable_buffer buf(data, size);
      72                 :     auto [ec, n] = co_await stream.read_some(buf);
      73                 :     @endcode
      74                 : 
      75                 :     @see any_write_stream, any_stream, ReadStream
      76                 : */
      77                 : class any_read_stream
      78                 : {
      79                 :     struct vtable;
      80                 : 
      81                 :     template<ReadStream S>
      82                 :     struct vtable_for_impl;
      83                 : 
      84                 :     // ordered for cache line coherence
      85                 :     void* stream_ = nullptr;
      86                 :     vtable const* vt_ = nullptr;
      87                 :     void* cached_awaitable_ = nullptr;
      88                 :     void* storage_ = nullptr;
      89                 :     bool awaitable_active_ = false;
      90                 : 
      91                 : public:
      92                 :     /** Destructor.
      93                 : 
      94                 :         Destroys the owned stream (if any) and releases the cached
      95                 :         awaitable storage.
      96                 :     */
      97                 :     ~any_read_stream();
      98                 : 
      99                 :     /** Default constructor.
     100                 : 
     101                 :         Constructs an empty wrapper. Operations on a default-constructed
     102                 :         wrapper result in undefined behavior.
     103                 :     */
     104 HIT           1 :     any_read_stream() = default;
     105                 : 
     106                 :     /** Non-copyable.
     107                 : 
     108                 :         The awaitable cache is per-instance and cannot be shared.
     109                 :     */
     110                 :     any_read_stream(any_read_stream const&) = delete;
     111                 :     any_read_stream& operator=(any_read_stream const&) = delete;
     112                 : 
     113                 :     /** Move constructor.
     114                 : 
     115                 :         Transfers ownership of the wrapped stream (if owned) and
     116                 :         cached awaitable storage from `other`. After the move, `other` is
     117                 :         in a default-constructed state.
     118                 : 
     119                 :         @param other The wrapper to move from.
     120                 :     */
     121               2 :     any_read_stream(any_read_stream&& other) noexcept
     122               2 :         : stream_(std::exchange(other.stream_, nullptr))
     123               2 :         , vt_(std::exchange(other.vt_, nullptr))
     124               2 :         , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
     125               2 :         , storage_(std::exchange(other.storage_, nullptr))
     126               2 :         , awaitable_active_(std::exchange(other.awaitable_active_, false))
     127                 :     {
     128               2 :     }
     129                 : 
     130                 :     /** Move assignment operator.
     131                 : 
     132                 :         Destroys any owned stream and releases existing resources,
     133                 :         then transfers ownership from `other`.
     134                 : 
     135                 :         @param other The wrapper to move from.
     136                 :         @return Reference to this wrapper.
     137                 :     */
     138                 :     any_read_stream&
     139                 :     operator=(any_read_stream&& other) noexcept;
     140                 : 
     141                 :     /** Construct by taking ownership of a ReadStream.
     142                 : 
     143                 :         Allocates storage and moves the stream into this wrapper.
     144                 :         The wrapper owns the stream and will destroy it.
     145                 : 
     146                 :         @param s The stream to take ownership of.
     147                 :     */
     148                 :     template<ReadStream S>
     149                 :         requires (!std::same_as<std::decay_t<S>, any_read_stream>)
     150                 :     any_read_stream(S s);
     151                 : 
     152                 :     /** Construct by wrapping a ReadStream without ownership.
     153                 : 
     154                 :         Wraps the given stream by pointer. The stream must remain
     155                 :         valid for the lifetime of this wrapper.
     156                 : 
     157                 :         @param s Pointer to the stream to wrap.
     158                 :     */
     159                 :     template<ReadStream S>
     160                 :     any_read_stream(S* s);
     161                 : 
     162                 :     /** Check if the wrapper contains a valid stream.
     163                 : 
     164                 :         @return `true` if wrapping a stream, `false` if default-constructed
     165                 :             or moved-from.
     166                 :     */
     167                 :     bool
     168              25 :     has_value() const noexcept
     169                 :     {
     170              25 :         return stream_ != nullptr;
     171                 :     }
     172                 : 
     173                 :     /** Check if the wrapper contains a valid stream.
     174                 : 
     175                 :         @return `true` if wrapping a stream, `false` if default-constructed
     176                 :             or moved-from.
     177                 :     */
     178                 :     explicit
     179               3 :     operator bool() const noexcept
     180                 :     {
     181               3 :         return has_value();
     182                 :     }
     183                 : 
     184                 :     /** Initiate an asynchronous read operation.
     185                 : 
     186                 :         Reads data into the provided buffer sequence. The operation
     187                 :         completes when at least one byte has been read, or an error
     188                 :         occurs.
     189                 : 
     190                 :         @param buffers The buffer sequence to read into. Passed by
     191                 :             value to ensure the sequence lives in the coroutine frame
     192                 :             across suspension points.
     193                 : 
     194                 :         @return An awaitable yielding `(error_code,std::size_t)`.
     195                 : 
     196                 :         @par Immediate Completion
     197                 :         The operation completes immediately without suspending
     198                 :         the calling coroutine when the underlying stream's
     199                 :         awaitable reports immediate readiness via `await_ready`.
     200                 : 
     201                 :         @note This is a partial operation and may not process the
     202                 :         entire buffer sequence. Use the composed @ref read algorithm
     203                 :         for guaranteed complete transfer.
     204                 : 
     205                 :         @par Preconditions
     206                 :         The wrapper must contain a valid stream (`has_value() == true`).
     207                 :         The caller must not call this function again after a prior
     208                 :         call returned an error (including EOF).
     209                 :     */
     210                 :     template<MutableBufferSequence MB>
     211                 :     auto
     212                 :     read_some(MB buffers);
     213                 : 
     214                 : protected:
     215                 :     /** Rebind to a new stream after move.
     216                 : 
     217                 :         Updates the internal pointer to reference a new stream object.
     218                 :         Used by owning wrappers after move assignment when the owned
     219                 :         object has moved to a new location.
     220                 : 
     221                 :         @param new_stream The new stream to bind to. Must be the same
     222                 :             type as the original stream.
     223                 : 
     224                 :         @note Terminates if called with a stream of different type
     225                 :             than the original.
     226                 :     */
     227                 :     template<ReadStream S>
     228                 :     void
     229                 :     rebind(S& new_stream) noexcept
     230                 :     {
     231                 :         if(vt_ != &vtable_for_impl<S>::value)
     232                 :             std::terminate();
     233                 :         stream_ = &new_stream;
     234                 :     }
     235                 : };
     236                 : 
     237                 : //----------------------------------------------------------
     238                 : 
     239                 : struct any_read_stream::vtable
     240                 : {
     241                 :     // ordered by call frequency for cache line coherence
     242                 :     void (*construct_awaitable)(
     243                 :         void* stream,
     244                 :         void* storage,
     245                 :         std::span<mutable_buffer const> buffers);
     246                 :     bool (*await_ready)(void*);
     247                 :     std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
     248                 :     io_result<std::size_t> (*await_resume)(void*);
     249                 :     void (*destroy_awaitable)(void*) noexcept;
     250                 :     std::size_t awaitable_size;
     251                 :     std::size_t awaitable_align;
     252                 :     void (*destroy)(void*) noexcept;
     253                 : };
     254                 : 
     255                 : template<ReadStream S>
     256                 : struct any_read_stream::vtable_for_impl
     257                 : {
     258                 :     using Awaitable = decltype(std::declval<S&>().read_some(
     259                 :         std::span<mutable_buffer const>{}));
     260                 : 
     261                 :     static void
     262               1 :     do_destroy_impl(void* stream) noexcept
     263                 :     {
     264               1 :         static_cast<S*>(stream)->~S();
     265               1 :     }
     266                 : 
     267                 :     static void
     268              91 :     construct_awaitable_impl(
     269                 :         void* stream,
     270                 :         void* storage,
     271                 :         std::span<mutable_buffer const> buffers)
     272                 :     {
     273              91 :         auto& s = *static_cast<S*>(stream);
     274              91 :         ::new(storage) Awaitable(s.read_some(buffers));
     275              91 :     }
     276                 : 
     277                 :     static constexpr vtable value = {
     278                 :         &construct_awaitable_impl,
     279              91 :         +[](void* p) {
     280              91 :             return static_cast<Awaitable*>(p)->await_ready();
     281                 :         },
     282 MIS           0 :         +[](void* p, std::coroutine_handle<> h, io_env const* env) {
     283               0 :             return detail::call_await_suspend(
     284               0 :                 static_cast<Awaitable*>(p), h, env);
     285                 :         },
     286 HIT          89 :         +[](void* p) {
     287              89 :             return static_cast<Awaitable*>(p)->await_resume();
     288                 :         },
     289              93 :         +[](void* p) noexcept {
     290              16 :             static_cast<Awaitable*>(p)->~Awaitable();
     291                 :         },
     292                 :         sizeof(Awaitable),
     293                 :         alignof(Awaitable),
     294                 :         &do_destroy_impl
     295                 :     };
     296                 : };
     297                 : 
     298                 : //----------------------------------------------------------
     299                 : 
     300                 : inline
     301             101 : any_read_stream::~any_read_stream()
     302                 : {
     303             101 :     if(storage_)
     304                 :     {
     305               1 :         vt_->destroy(stream_);
     306               1 :         ::operator delete(storage_);
     307                 :     }
     308             101 :     if(cached_awaitable_)
     309                 :     {
     310              91 :         if(awaitable_active_)
     311               1 :             vt_->destroy_awaitable(cached_awaitable_);
     312              91 :         ::operator delete(cached_awaitable_);
     313                 :     }
     314             101 : }
     315                 : 
     316                 : inline any_read_stream&
     317               5 : any_read_stream::operator=(any_read_stream&& other) noexcept
     318                 : {
     319               5 :     if(this != &other)
     320                 :     {
     321               5 :         if(storage_)
     322                 :         {
     323 MIS           0 :             vt_->destroy(stream_);
     324               0 :             ::operator delete(storage_);
     325                 :         }
     326 HIT           5 :         if(cached_awaitable_)
     327                 :         {
     328               2 :             if(awaitable_active_)
     329               1 :                 vt_->destroy_awaitable(cached_awaitable_);
     330               2 :             ::operator delete(cached_awaitable_);
     331                 :         }
     332               5 :         stream_ = std::exchange(other.stream_, nullptr);
     333               5 :         vt_ = std::exchange(other.vt_, nullptr);
     334               5 :         cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
     335               5 :         storage_ = std::exchange(other.storage_, nullptr);
     336               5 :         awaitable_active_ = std::exchange(other.awaitable_active_, false);
     337                 :     }
     338               5 :     return *this;
     339                 : }
     340                 : 
     341                 : template<ReadStream S>
     342                 :     requires (!std::same_as<std::decay_t<S>, any_read_stream>)
     343               1 : any_read_stream::any_read_stream(S s)
     344               1 :     : vt_(&vtable_for_impl<S>::value)
     345                 : {
     346                 :     struct guard {
     347                 :         any_read_stream* self;
     348                 :         bool committed = false;
     349               1 :         ~guard() {
     350               1 :             if(!committed && self->storage_) {
     351 MIS           0 :                 self->vt_->destroy(self->stream_);
     352               0 :                 ::operator delete(self->storage_);
     353               0 :                 self->storage_ = nullptr;
     354               0 :                 self->stream_ = nullptr;
     355                 :             }
     356 HIT           1 :         }
     357               1 :     } g{this};
     358                 : 
     359               1 :     storage_ = ::operator new(sizeof(S));
     360               1 :     stream_ = ::new(storage_) S(std::move(s));
     361                 : 
     362                 :     // Preallocate the awaitable storage
     363               1 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     364                 : 
     365               1 :     g.committed = true;
     366               1 : }
     367                 : 
     368                 : template<ReadStream S>
     369              92 : any_read_stream::any_read_stream(S* s)
     370              92 :     : stream_(s)
     371              92 :     , vt_(&vtable_for_impl<S>::value)
     372                 : {
     373                 :     // Preallocate the awaitable storage
     374              92 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     375              92 : }
     376                 : 
     377                 : //----------------------------------------------------------
     378                 : 
     379                 : template<MutableBufferSequence MB>
     380                 : auto
     381              91 : any_read_stream::read_some(MB buffers)
     382                 : {
     383                 :     // VFALCO in theory, we could use if constexpr to detect a
     384                 :     // span and then pass that through to read_some without the array
     385                 :     struct awaitable
     386                 :     {
     387                 :         any_read_stream* self_;
     388                 :         mutable_buffer_array<detail::max_iovec_> ba_;
     389                 : 
     390                 :         bool
     391              91 :         await_ready()
     392                 :         {
     393              91 :             self_->vt_->construct_awaitable(
     394              91 :                 self_->stream_,
     395              91 :                 self_->cached_awaitable_,
     396              91 :                 ba_.to_span());
     397              91 :             self_->awaitable_active_ = true;
     398                 : 
     399             182 :             return self_->vt_->await_ready(
     400              91 :                 self_->cached_awaitable_);
     401                 :         }
     402                 : 
     403                 :         std::coroutine_handle<>
     404 MIS           0 :         await_suspend(std::coroutine_handle<> h, io_env const* env)
     405                 :         {
     406               0 :             return self_->vt_->await_suspend(
     407               0 :                 self_->cached_awaitable_, h, env);
     408                 :         }
     409                 : 
     410                 :         io_result<std::size_t>
     411 HIT          89 :         await_resume()
     412                 :         {
     413                 :             struct guard {
     414                 :                 any_read_stream* self;
     415              89 :                 ~guard() {
     416              89 :                     self->vt_->destroy_awaitable(self->cached_awaitable_);
     417              89 :                     self->awaitable_active_ = false;
     418              89 :                 }
     419              89 :             } g{self_};
     420              89 :             return self_->vt_->await_resume(
     421             154 :                 self_->cached_awaitable_);
     422              89 :         }
     423                 :     };
     424                 :     return awaitable{this,
     425              91 :         mutable_buffer_array<detail::max_iovec_>(buffers)};
     426              91 : }
     427                 : 
     428                 : } // namespace capy
     429                 : } // namespace boost
     430                 : 
     431                 : #endif
        

Generated by: LCOV version 2.3