LCOV - code coverage report
Current view: top level - capy/test - stream.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 93.5 % 123 115 8
Test Date: 2026-02-17 18:14:47 Functions: 86.1 % 36 31 5

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/capy
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_CAPY_TEST_STREAM_HPP
      11                 : #define BOOST_CAPY_TEST_STREAM_HPP
      12                 : 
      13                 : #include <boost/capy/detail/config.hpp>
      14                 : #include <boost/capy/buffers.hpp>
      15                 : #include <boost/capy/buffers/buffer_copy.hpp>
      16                 : #include <boost/capy/buffers/make_buffer.hpp>
      17                 : #include <coroutine>
      18                 : #include <boost/capy/ex/io_env.hpp>
      19                 : #include <boost/capy/io_result.hpp>
      20                 : #include <boost/capy/error.hpp>
      21                 : #include <boost/capy/read.hpp>
      22                 : #include <boost/capy/task.hpp>
      23                 : #include <boost/capy/test/fuse.hpp>
      24                 : #include <boost/capy/test/run_blocking.hpp>
      25                 : 
      26                 : #include <memory>
      27                 : #include <stop_token>
      28                 : #include <string>
      29                 : #include <string_view>
      30                 : #include <utility>
      31                 : 
      32                 : namespace boost {
      33                 : namespace capy {
      34                 : namespace test {
      35                 : 
      36                 : /** A connected stream for testing bidirectional I/O.
      37                 : 
      38                 :     Streams are created in pairs via @ref make_stream_pair.
      39                 :     Data written to one end becomes available for reading on
      40                 :     the other. If no data is available when @ref read_some
      41                 :     is called, the calling coroutine suspends until the peer
      42                 :     calls @ref write_some. The shared @ref fuse enables error
      43                 :     injection at controlled points in both directions.
      44                 : 
      45                 :     When the fuse injects an error or throws on one end, the
      46                 :     other end is automatically closed: any suspended reader is
      47                 :     resumed with `error::eof`, and subsequent operations on
      48                 :     both ends return `error::eof`. Calling @ref close on one
      49                 :     end signals eof to the peer's reads after draining any
      50                 :     buffered data, while the peer may still write.
      51                 : 
      52                 :     @par Thread Safety
      53                 :     Single-threaded only. Both ends of the pair must be
      54                 :     accessed from the same thread. Concurrent access is
      55                 :     undefined behavior.
      56                 : 
      57                 :     @par Example
      58                 :     @code
      59                 :     fuse f;
      60                 :     auto [a, b] = make_stream_pair( f );
      61                 : 
      62                 :     auto r = f.armed( [&]( fuse& ) -> task<> {
      63                 :         auto [ec, n] = co_await a.write_some(
      64                 :             const_buffer( "hello", 5 ) );
      65                 :         if( ec )
      66                 :             co_return;
      67                 : 
      68                 :         char buf[32];
      69                 :         auto [ec2, n2] = co_await b.read_some(
      70                 :             mutable_buffer( buf, sizeof( buf ) ) );
      71                 :         if( ec2 )
      72                 :             co_return;
      73                 :         // buf contains "hello"
      74                 :     } );
      75                 :     @endcode
      76                 : 
      77                 :     @see make_stream_pair, fuse
      78                 : */
      79                 : class stream
      80                 : {
      81                 :     // Single-threaded only. No concurrent access to either
      82                 :     // end of the pair. Both streams and all operations must
      83                 :     // run on the same thread.
      84                 : 
      85                 :     struct half
      86                 :     {
      87                 :         std::string buf;
      88                 :         std::size_t max_read_size = std::size_t(-1);
      89                 :         std::coroutine_handle<> pending_h{};
      90                 :         executor_ref pending_ex;
      91                 :         bool eof = false;
      92                 :     };
      93                 : 
      94                 :     struct state
      95                 :     {
      96                 :         fuse f;
      97                 :         bool closed = false;
      98                 :         half sides[2];
      99                 : 
     100 HIT         279 :         explicit state(fuse f_) noexcept
     101             837 :             : f(std::move(f_))
     102                 :         {
     103             279 :         }
     104                 : 
     105                 :         // Set closed and resume any suspended readers
     106                 :         // with eof on both sides.
     107             208 :         void close()
     108                 :         {
     109             208 :             closed = true;
     110             624 :             for(auto& side : sides)
     111                 :             {
     112             416 :                 if(side.pending_h)
     113                 :                 {
     114              12 :                     auto h = side.pending_h;
     115              12 :                     side.pending_h = {};
     116              12 :                     auto ex = side.pending_ex;
     117              12 :                     side.pending_ex = {};
     118              12 :                     ex.post(h);
     119                 :                 }
     120                 :             }
     121             208 :         }
     122                 :     };
     123                 : 
     124                 :     // Wraps the maybe_fail() call. If the guard is
     125                 :     // not disarmed before destruction (fuse returned
     126                 :     // an error, or threw an exception), closes both
     127                 :     // ends so any suspended peer gets eof.
     128                 :     struct close_guard
     129                 :     {
     130                 :         state* st;
     131                 :         bool armed = true;
     132             298 :         void disarm() noexcept { armed = false; }
     133             506 :         ~close_guard() noexcept(false) { if(armed) st->close(); }
     134                 :     };
     135                 : 
     136                 :     std::shared_ptr<state> state_;
     137                 :     int index_;
     138                 : 
     139             558 :     stream(
     140                 :         std::shared_ptr<state> sp,
     141                 :         int index) noexcept
     142             558 :         : state_(std::move(sp))
     143             558 :         , index_(index)
     144                 :     {
     145             558 :     }
     146                 : 
     147                 :     friend std::pair<stream, stream>
     148                 :     make_stream_pair(fuse);
     149                 : 
     150                 : public:
     151                 :     stream(stream const&) = delete;
     152                 :     stream& operator=(stream const&) = delete;
     153             658 :     stream(stream&&) = default;
     154                 :     stream& operator=(stream&&) = default;
     155                 : 
     156                 :     /** Signal end-of-stream to the peer.
     157                 : 
     158                 :         Marks the peer's read direction as closed.
     159                 :         If the peer is suspended in @ref read_some,
     160                 :         it is resumed. The peer drains any buffered
     161                 :         data before receiving `error::eof`. Writes
     162                 :         from the peer are unaffected.
     163                 :     */
     164                 :     void
     165               3 :     close()
     166                 :     {
     167               3 :         int peer = 1 - index_;
     168               3 :         auto& side = state_->sides[peer];
     169               3 :         side.eof = true;
     170               3 :         if(side.pending_h)
     171                 :         {
     172               1 :             auto h = side.pending_h;
     173               1 :             side.pending_h = {};
     174               1 :             auto ex = side.pending_ex;
     175               1 :             side.pending_ex = {};
     176               1 :             ex.post(h);
     177                 :         }
     178               3 :     }
     179                 : 
     180                 :     /** Set the maximum bytes returned per read.
     181                 : 
     182                 :         Limits how many bytes @ref read_some returns in
     183                 :         a single call, simulating chunked network delivery.
     184                 :         The default is unlimited.
     185                 : 
     186                 :         @param n Maximum bytes per read.
     187                 :     */
     188                 :     void
     189              54 :     set_max_read_size(std::size_t n) noexcept
     190                 :     {
     191              54 :         state_->sides[index_].max_read_size = n;
     192              54 :     }
     193                 : 
     194                 :     /** Asynchronously read data from the stream.
     195                 : 
     196                 :         Transfers up to `buffer_size(buffers)` bytes from
     197                 :         data written by the peer. If no data is available,
     198                 :         the calling coroutine suspends until the peer calls
     199                 :         @ref write_some. Before every read, the attached
     200                 :         @ref fuse is consulted to possibly inject an error.
     201                 :         If the fuse fires, the peer is automatically closed.
     202                 :         If the stream is closed, returns `error::eof`.
     203                 :         The returned `std::size_t` is the number of bytes
     204                 :         transferred.
     205                 : 
     206                 :         @param buffers The mutable buffer sequence to receive data.
     207                 : 
     208                 :         @return An awaitable yielding `(error_code,std::size_t)`.
     209                 : 
     210                 :         @see fuse, close
     211                 :     */
     212                 :     template<MutableBufferSequence MB>
     213                 :     auto
     214             274 :     read_some(MB buffers)
     215                 :     {
     216                 :         struct awaitable
     217                 :         {
     218                 :             stream* self_;
     219                 :             MB buffers_;
     220                 : 
     221             274 :             bool await_ready() const noexcept
     222                 :             {
     223             274 :                 if(buffer_empty(buffers_))
     224               8 :                     return true;
     225             266 :                 auto* st = self_->state_.get();
     226             266 :                 auto& side = st->sides[self_->index_];
     227             530 :                 return st->closed || side.eof ||
     228             530 :                     !side.buf.empty();
     229                 :             }
     230                 : 
     231              25 :             std::coroutine_handle<> await_suspend(
     232                 :                 std::coroutine_handle<> h,
     233                 :                 io_env const* env) noexcept
     234                 :             {
     235              25 :                 auto& side = self_->state_->sides[
     236              25 :                     self_->index_];
     237              25 :                 side.pending_h = h;
     238              25 :                 side.pending_ex = env->executor;
     239              25 :                 return std::noop_coroutine();
     240                 :             }
     241                 : 
     242                 :             io_result<std::size_t>
     243             274 :             await_resume()
     244                 :             {
     245             274 :                 if(buffer_empty(buffers_))
     246               8 :                     return {{}, 0};
     247                 : 
     248             266 :                 auto* st = self_->state_.get();
     249             266 :                 auto& side = st->sides[
     250             266 :                     self_->index_];
     251                 : 
     252             266 :                 if(st->closed)
     253              12 :                     return {error::eof, 0};
     254                 : 
     255             254 :                 if(side.eof && side.buf.empty())
     256               3 :                     return {error::eof, 0};
     257                 : 
     258             251 :                 if(!side.eof)
     259                 :                 {
     260             251 :                     close_guard g{st};
     261             251 :                     auto ec = st->f.maybe_fail();
     262             198 :                     if(ec)
     263              53 :                         return {ec, 0};
     264             145 :                     g.disarm();
     265             251 :                 }
     266                 : 
     267             290 :                 std::size_t const n = buffer_copy(
     268             145 :                     buffers_, make_buffer(side.buf),
     269                 :                     side.max_read_size);
     270             145 :                 side.buf.erase(0, n);
     271             145 :                 return {{}, n};
     272                 :             }
     273                 :         };
     274             274 :         return awaitable{this, buffers};
     275                 :     }
     276                 : 
     277                 :     /** Asynchronously write data to the stream.
     278                 : 
     279                 :         Transfers up to `buffer_size(buffers)` bytes to the
     280                 :         peer's incoming buffer. If the peer is suspended in
     281                 :         @ref read_some, it is resumed. Before every write,
     282                 :         the attached @ref fuse is consulted to possibly inject
     283                 :         an error. If the fuse fires, the peer is automatically
     284                 :         closed. If the stream is closed, returns `error::eof`.
     285                 :         The returned `std::size_t` is the number of bytes
     286                 :         transferred.
     287                 : 
     288                 :         @param buffers The const buffer sequence containing
     289                 :             data to write.
     290                 : 
     291                 :         @return An awaitable yielding `(error_code,std::size_t)`.
     292                 : 
     293                 :         @see fuse, close
     294                 :     */
     295                 :     template<ConstBufferSequence CB>
     296                 :     auto
     297             259 :     write_some(CB buffers)
     298                 :     {
     299                 :         struct awaitable
     300                 :         {
     301                 :             stream* self_;
     302                 :             CB buffers_;
     303                 : 
     304             259 :             bool await_ready() const noexcept { return true; }
     305                 : 
     306 MIS           0 :             void await_suspend(
     307                 :                 std::coroutine_handle<>,
     308                 :                 io_env const*) const noexcept
     309                 :             {
     310               0 :             }
     311                 : 
     312                 :             io_result<std::size_t>
     313 HIT         259 :             await_resume()
     314                 :             {
     315             259 :                 std::size_t n = buffer_size(buffers_);
     316             259 :                 if(n == 0)
     317               4 :                     return {{}, 0};
     318                 : 
     319             255 :                 auto* st = self_->state_.get();
     320                 : 
     321             255 :                 if(st->closed)
     322 MIS           0 :                     return {error::eof, 0};
     323                 : 
     324 HIT         255 :                 close_guard g{st};
     325             255 :                 auto ec = st->f.maybe_fail();
     326             204 :                 if(ec)
     327              51 :                     return {ec, 0};
     328             153 :                 g.disarm();
     329                 : 
     330             153 :                 int peer = 1 - self_->index_;
     331             153 :                 auto& side = st->sides[peer];
     332                 : 
     333             153 :                 std::size_t const old_size = side.buf.size();
     334             153 :                 side.buf.resize(old_size + n);
     335             153 :                 buffer_copy(make_buffer(
     336             153 :                     side.buf.data() + old_size, n),
     337             153 :                     buffers_, n);
     338                 : 
     339             153 :                 if(side.pending_h)
     340                 :                 {
     341              12 :                     auto h = side.pending_h;
     342              12 :                     side.pending_h = {};
     343              12 :                     auto ex = side.pending_ex;
     344              12 :                     side.pending_ex = {};
     345              12 :                     ex.post(h);
     346                 :                 }
     347                 : 
     348             153 :                 return {{}, n};
     349             255 :             }
     350                 :         };
     351             259 :         return awaitable{this, buffers};
     352                 :     }
     353                 : 
     354                 :     /** Inject data into this stream's peer for reading.
     355                 : 
     356                 :         Appends data directly to the peer's incoming buffer,
     357                 :         bypassing the fuse. If the peer is suspended in
     358                 :         @ref read_some, it is resumed. This is test setup,
     359                 :         not an operation under test.
     360                 : 
     361                 :         @param sv The data to inject.
     362                 : 
     363                 :         @see make_stream_pair
     364                 :     */
     365                 :     void
     366              86 :     provide(std::string_view sv)
     367                 :     {
     368              86 :         int peer = 1 - index_;
     369              86 :         auto& side = state_->sides[peer];
     370              86 :         side.buf.append(sv);
     371              86 :         if(side.pending_h)
     372                 :         {
     373 MIS           0 :             auto h = side.pending_h;
     374               0 :             side.pending_h = {};
     375               0 :             auto ex = side.pending_ex;
     376               0 :             side.pending_ex = {};
     377               0 :             ex.post(h);
     378                 :         }
     379 HIT          86 :     }
     380                 : 
     381                 :     /** Read from this stream and verify the content.
     382                 : 
     383                 :         Reads exactly `expected.size()` bytes from the stream
     384                 :         and compares against the expected string. The read goes
     385                 :         through the normal path including the fuse.
     386                 : 
     387                 :         @param expected The expected content.
     388                 : 
     389                 :         @return A pair of `(error_code, bool)`. The error_code
     390                 :             is set if a read error occurs (e.g. fuse injection).
     391                 :             The bool is true if the data matches.
     392                 : 
     393                 :         @see provide
     394                 :     */
     395                 :     std::pair<std::error_code, bool>
     396              38 :     expect(std::string_view expected)
     397                 :     {
     398              38 :         std::error_code result;
     399              38 :         bool match = false;
     400             141 :         run_blocking()([](
     401                 :             stream& self,
     402                 :             std::string_view expected,
     403                 :             std::error_code& result,
     404                 :             bool& match) -> task<>
     405                 :         {
     406                 :             std::string buf(expected.size(), '\0');
     407                 :             auto [ec, n] = co_await read(
     408                 :                 self, mutable_buffer(
     409                 :                     buf.data(), buf.size()));
     410                 :             if(ec)
     411                 :             {
     412                 :                 result = ec;
     413                 :                 co_return;
     414                 :             }
     415                 :             match = (std::string_view(
     416                 :                 buf.data(), n) == expected);
     417             161 :         }(*this, expected, result, match));
     418              58 :         return {result, match};
     419                 :     }
     420                 : 
     421                 :     /** Return the stream's pending read data.
     422                 : 
     423                 :         Returns a view of the data waiting to be read
     424                 :         from this stream. This is a direct peek at the
     425                 :         internal buffer, bypassing the fuse.
     426                 : 
     427                 :         @return A view of the pending data.
     428                 : 
     429                 :         @see provide, expect
     430                 :     */
     431                 :     std::string_view
     432                 :     data() const noexcept
     433                 :     {
     434                 :         return state_->sides[index_].buf;
     435                 :     }
     436                 : };
     437                 : 
     438                 : /** Create a connected pair of test streams.
     439                 : 
     440                 :     Data written to one stream becomes readable on the other.
     441                 :     If a coroutine calls @ref stream::read_some when no data
     442                 :     is available, it suspends until the peer writes. Before
     443                 :     every read or write, the @ref fuse is consulted to
     444                 :     possibly inject an error for testing fault scenarios.
     445                 :     When the fuse fires, the peer is automatically closed.
     446                 : 
     447                 :     @param f The fuse used to inject errors during operations.
     448                 : 
     449                 :     @return A pair of connected streams.
     450                 : 
     451                 :     @see stream, fuse
     452                 : */
     453                 : inline std::pair<stream, stream>
     454             279 : make_stream_pair(fuse f = {})
     455                 : {
     456             279 :     auto sp = std::make_shared<stream::state>(std::move(f));
     457             558 :     return {stream(sp, 0), stream(sp, 1)};
     458             279 : }
     459                 : 
     460                 : } // test
     461                 : } // capy
     462                 : } // boost
     463                 : 
     464                 : #endif
        

Generated by: LCOV version 2.3