libs/corosio/include/boost/corosio/io_stream.hpp

100.0% Lines (35/35) 100.0% Functions (27/27) 100.0% Branches (6/6)
libs/corosio/include/boost/corosio/io_stream.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_IO_STREAM_HPP
11 #define BOOST_COROSIO_IO_STREAM_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/corosio/io_object.hpp>
15 #include <boost/capy/io_result.hpp>
16 #include <boost/corosio/io_buffer_param.hpp>
17 #include <boost/capy/ex/executor_ref.hpp>
18 #include <system_error>
19
20 #include <coroutine>
21 #include <cstddef>
22 #include <stop_token>
23
24 namespace boost::corosio {
25
26 /** Platform stream with read/write operations.
27
28 This base class provides the fundamental async read and write
29 operations for kernel-level stream I/O. Derived classes wrap
30 OS-specific stream implementations (sockets, pipes, etc.) and
31 satisfy @ref capy::ReadStream and @ref capy::WriteStream concepts.
32
33 @par Semantics
34 Concrete classes wrap direct platform I/O completed by the kernel.
35 Functions taking `io_stream&` signal "platform implementation
36 required" - use this when you need actual kernel I/O rather than
37 a mock or test double.
38
39 For generic stream algorithms that work with test mocks,
40 use `template<capy::Stream S>` instead of `io_stream&`.
41
42 @par Thread Safety
43 Distinct objects: Safe.
44 Shared objects: Unsafe. All calls to a single stream must be made
45 from the same implicit or explicit serialization context.
46
47 @par Example
48 @code
49 // Read until buffer full or EOF
50 capy::task<> read_all( io_stream& stream, std::span<char> buf )
51 {
52 std::size_t total = 0;
53 while( total < buf.size() )
54 {
55 auto [ec, n] = co_await stream.read_some(
56 capy::buffer( buf.data() + total, buf.size() - total ) );
57 if( ec == capy::cond::eof )
58 break;
59 if( ec.failed() )
60 capy::detail::throw_system_error( ec );
61 total += n;
62 }
63 }
64 @endcode
65
66 @see capy::Stream, capy::ReadStream, capy::WriteStream, tcp_socket
67 */
68 class BOOST_COROSIO_DECL io_stream : public io_object
69 {
70 public:
71 /** Asynchronously read data from the stream.
72
73 This operation suspends the calling coroutine and initiates a
74 kernel-level read. The coroutine resumes when the operation
75 completes.
76
77 @li The operation completes when:
78 @li At least one byte has been read into the buffer sequence
79 @li The peer closes the connection (EOF)
80 @li An error occurs
81 @li The operation is cancelled via stop token or `cancel()`
82
83 @par Concurrency
84 At most one write operation may be in flight concurrently with
85 this read. No other read operations may be in flight until this
86 operation completes. Note that concurrent in-flight operations
87 does not imply the initiating calls may be made concurrently;
88 all calls must be serialized.
89
90 @par Cancellation
91 Supports cancellation via `std::stop_token` propagated through
92 the IoAwaitable protocol, or via the I/O object's `cancel()`
93 member. When cancelled, the operation completes with an error
94 that compares equal to `capy::cond::canceled`.
95
96 @par Preconditions
97 The stream must be open and connected.
98
99 @param buffers The buffer sequence to read data into. The caller
100 retains ownership and must ensure validity until the
101 operation completes.
102
103 @return An awaitable yielding `(error_code, std::size_t)`.
104 On success, `bytes_transferred` contains the number of bytes
105 read. Compare error codes to conditions, not specific values:
106 @li `capy::cond::eof` - Peer closed connection (TCP FIN)
107 @li `capy::cond::canceled` - Operation was cancelled
108
109 @par Example
110 @code
111 // Simple read with error handling
112 auto [ec, n] = co_await stream.read_some( capy::buffer( buf ) );
113 if( ec == capy::cond::eof )
114 co_return; // Connection closed gracefully
115 if( ec.failed() )
116 capy::detail::throw_system_error( ec );
117 process( buf, n );
118 @endcode
119
120 @note This operation may read fewer bytes than the buffer
121 capacity. Use a loop or `capy::async_read` to read an
122 exact amount.
123
124 @see write_some, capy::async_read
125 */
126 template<capy::MutableBufferSequence MB>
127 165221 auto read_some(MB const& buffers)
128 {
129 165221 return read_some_awaitable<MB>(*this, buffers);
130 }
131
132 /** Asynchronously write data to the stream.
133
134 This operation suspends the calling coroutine and initiates a
135 kernel-level write. The coroutine resumes when the operation
136 completes.
137
138 @li The operation completes when:
139 @li At least one byte has been written from the buffer sequence
140 @li An error occurs (including connection reset by peer)
141 @li The operation is cancelled via stop token or `cancel()`
142
143 @par Concurrency
144 At most one read operation may be in flight concurrently with
145 this write. No other write operations may be in flight until
146 this operation completes. Note that concurrent in-flight
147 operations does not imply the initiating calls may be made
148 concurrently; all calls must be serialized.
149
150 @par Cancellation
151 Supports cancellation via `std::stop_token` propagated through
152 the IoAwaitable protocol, or via the I/O object's `cancel()`
153 member. When cancelled, the operation completes with an error
154 that compares equal to `capy::cond::canceled`.
155
156 @par Preconditions
157 The stream must be open and connected.
158
159 @param buffers The buffer sequence containing data to write.
160 The caller retains ownership and must ensure validity
161 until the operation completes.
162
163 @return An awaitable yielding `(error_code, std::size_t)`.
164 On success, `bytes_transferred` contains the number of bytes
165 written. Compare error codes to conditions, not specific
166 values:
167 @li `capy::cond::canceled` - Operation was cancelled
168 @li `std::errc::broken_pipe` - Peer closed connection
169
170 @par Example
171 @code
172 // Write all data
173 std::string_view data = "Hello, World!";
174 std::size_t written = 0;
175 while( written < data.size() )
176 {
177 auto [ec, n] = co_await stream.write_some(
178 capy::buffer( data.data() + written,
179 data.size() - written ) );
180 if( ec.failed() )
181 capy::detail::throw_system_error( ec );
182 written += n;
183 }
184 @endcode
185
186 @note This operation may write fewer bytes than the buffer
187 contains. Use a loop or `capy::async_write` to write
188 all data.
189
190 @see read_some, capy::async_write
191 */
192 template<capy::ConstBufferSequence CB>
193 164979 auto write_some(CB const& buffers)
194 {
195 164979 return write_some_awaitable<CB>(*this, buffers);
196 }
197
198 protected:
199 /// Awaitable for async read operations.
200 template<class MutableBufferSequence>
201 struct read_some_awaitable
202 {
203 io_stream& ios_;
204 MutableBufferSequence buffers_;
205 std::stop_token token_;
206 mutable std::error_code ec_;
207 mutable std::size_t bytes_transferred_ = 0;
208
209 165221 read_some_awaitable(
210 io_stream& ios,
211 MutableBufferSequence buffers) noexcept
212 165221 : ios_(ios)
213 165221 , buffers_(std::move(buffers))
214 {
215 165221 }
216
217 165221 bool await_ready() const noexcept
218 {
219 165221 return token_.stop_requested();
220 }
221
222 165221 capy::io_result<std::size_t> await_resume() const noexcept
223 {
224
2/2
✓ Branch 1 taken 197 times.
✓ Branch 2 taken 165024 times.
165221 if (token_.stop_requested())
225 197 return {make_error_code(std::errc::operation_canceled), 0};
226 165024 return {ec_, bytes_transferred_};
227 }
228
229 165221 auto await_suspend(
230 std::coroutine_handle<> h,
231 capy::executor_ref ex,
232 std::stop_token token) -> std::coroutine_handle<>
233 {
234 165221 token_ = std::move(token);
235
1/1
✓ Branch 4 taken 165221 times.
165221 return ios_.get().read_some(h, ex, buffers_, token_, &ec_, &bytes_transferred_);
236 }
237 };
238
239 /// Awaitable for async write operations.
240 template<class ConstBufferSequence>
241 struct write_some_awaitable
242 {
243 io_stream& ios_;
244 ConstBufferSequence buffers_;
245 std::stop_token token_;
246 mutable std::error_code ec_;
247 mutable std::size_t bytes_transferred_ = 0;
248
249 164979 write_some_awaitable(
250 io_stream& ios,
251 ConstBufferSequence buffers) noexcept
252 164979 : ios_(ios)
253 164979 , buffers_(std::move(buffers))
254 {
255 164979 }
256
257 164979 bool await_ready() const noexcept
258 {
259 164979 return token_.stop_requested();
260 }
261
262 164979 capy::io_result<std::size_t> await_resume() const noexcept
263 {
264
2/2
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 164978 times.
164979 if (token_.stop_requested())
265 1 return {make_error_code(std::errc::operation_canceled), 0};
266 164978 return {ec_, bytes_transferred_};
267 }
268
269 164979 auto await_suspend(
270 std::coroutine_handle<> h,
271 capy::executor_ref ex,
272 std::stop_token token) -> std::coroutine_handle<>
273 {
274 164979 token_ = std::move(token);
275
1/1
✓ Branch 4 taken 164979 times.
164979 return ios_.get().write_some(h, ex, buffers_, token_, &ec_, &bytes_transferred_);
276 }
277 };
278
279 public:
280 /** Platform-specific stream implementation interface.
281
282 Derived classes implement this interface to provide kernel-level
283 read and write operations for each supported platform (IOCP,
284 epoll, kqueue, io_uring).
285 */
286 struct io_stream_impl : io_object_impl
287 {
288 /// Initiate platform read operation.
289 virtual std::coroutine_handle<> read_some(
290 std::coroutine_handle<>,
291 capy::executor_ref,
292 io_buffer_param,
293 std::stop_token,
294 std::error_code*,
295 std::size_t*) = 0;
296
297 /// Initiate platform write operation.
298 virtual std::coroutine_handle<> write_some(
299 std::coroutine_handle<>,
300 capy::executor_ref,
301 io_buffer_param,
302 std::stop_token,
303 std::error_code*,
304 std::size_t*) = 0;
305 };
306
307 protected:
308 /// Construct stream bound to the given execution context.
309 explicit
310 10031 io_stream(
311 capy::execution_context& ctx) noexcept
312 10031 : io_object(ctx)
313 {
314 10031 }
315
316 private:
317 /// Return implementation downcasted to stream interface.
318 330200 io_stream_impl& get() const noexcept
319 {
320 330200 return *static_cast<io_stream_impl*>(impl_);
321 }
322 };
323
324 } // namespace boost::corosio
325
326 #endif
327