libs/corosio/src/corosio/src/detail/select/op.hpp

74.8% Lines (98/131) 84.2% Functions (16/19) 65.7% Branches (23/35)
libs/corosio/src/corosio/src/detail/select/op.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_SELECT_OP_HPP
11 #define BOOST_COROSIO_DETAIL_SELECT_OP_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/io_object.hpp>
19 #include <boost/corosio/endpoint.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <boost/capy/coro.hpp>
22 #include <boost/capy/error.hpp>
23 #include <system_error>
24
25 #include "src/detail/make_err.hpp"
26 #include "src/detail/scheduler_op.hpp"
27 #include "src/detail/endpoint_convert.hpp"
28
29 #include <unistd.h>
30 #include <errno.h>
31 #include <fcntl.h>
32
33 #include <atomic>
34 #include <cstddef>
35 #include <memory>
36 #include <optional>
37 #include <stop_token>
38
39 #include <netinet/in.h>
40 #include <sys/select.h>
41 #include <sys/socket.h>
42 #include <sys/uio.h>
43
44 /*
45 select Operation State
46 ======================
47
48 Each async I/O operation has a corresponding select_op-derived struct that
49 holds the operation's state while it's in flight. The socket impl owns
50 fixed slots for each operation type (conn_, rd_, wr_), so only one
51 operation of each type can be pending per socket at a time.
52
53 This mirrors the epoll_op design for consistency across backends.
54
55 Completion vs Cancellation Race
56 -------------------------------
57 The `registered` atomic uses a tri-state (unregistered, registering,
58 registered) to handle two races: (1) between register_fd() and the
59 reactor seeing an event, and (2) between reactor completion and cancel().
60
61 The registering state closes the window where an event could arrive
62 after register_fd() but before the boolean was set. The reactor and
63 cancel() both treat registering the same as registered when claiming.
64
65 Whoever atomically exchanges to unregistered "claims" the operation
66 and is responsible for completing it. The loser sees unregistered and
67 does nothing. The initiating thread uses compare_exchange to transition
68 from registering to registered; if this fails, the reactor or cancel
69 already claimed the op.
70
71 Impl Lifetime Management
72 ------------------------
73 When cancel() posts an op to the scheduler's ready queue, the socket impl
74 might be destroyed before the scheduler processes the op. The `impl_ptr`
75 member holds a shared_ptr to the impl, keeping it alive until the op
76 completes.
77
78 EOF Detection
79 -------------
80 For reads, 0 bytes with no error means EOF. But an empty user buffer also
81 returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
82
83 SIGPIPE Prevention
84 ------------------
85 Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
86 SIGPIPE when the peer has closed.
87 */
88
89 namespace boost::corosio::detail {
90
91 // Forward declarations for cancellation support
92 class select_socket_impl;
93 class select_acceptor_impl;
94
95 /** Registration state for async operations.
96
97 Tri-state enum to handle the race between register_fd() and
98 run_reactor() seeing an event. Setting REGISTERING before
99 calling register_fd() ensures events delivered during the
100 registration window are not dropped.
101 */
102 enum class select_registration_state : std::uint8_t
103 {
104 unregistered, ///< Not registered with reactor
105 registering, ///< register_fd() called, not yet confirmed
106 registered ///< Fully registered, ready for events
107 };
108
109 struct select_op : scheduler_op
110 {
111 struct canceller
112 {
113 select_op* op;
114 void operator()() const noexcept;
115 };
116
117 capy::coro h;
118 capy::executor_ref ex;
119 std::error_code* ec_out = nullptr;
120 std::size_t* bytes_out = nullptr;
121
122 int fd = -1;
123 int errn = 0;
124 std::size_t bytes_transferred = 0;
125
126 std::atomic<bool> cancelled{false};
127 std::atomic<select_registration_state> registered{select_registration_state::unregistered};
128 std::optional<std::stop_callback<canceller>> stop_cb;
129
130 // Prevents use-after-free when socket is closed with pending ops.
131 std::shared_ptr<void> impl_ptr;
132
133 // For stop_token cancellation - pointer to owning socket/acceptor impl.
134 select_socket_impl* socket_impl_ = nullptr;
135 select_acceptor_impl* acceptor_impl_ = nullptr;
136
137 13156 select_op()
138 13156 {
139 13156 data_ = this;
140 13156 }
141
142 167235 void reset() noexcept
143 {
144 167235 fd = -1;
145 167235 errn = 0;
146 167235 bytes_transferred = 0;
147 167235 cancelled.store(false, std::memory_order_relaxed);
148 167235 registered.store(select_registration_state::unregistered, std::memory_order_relaxed);
149 167235 impl_ptr.reset();
150 167235 socket_impl_ = nullptr;
151 167235 acceptor_impl_ = nullptr;
152 167235 }
153
154 162874 void operator()() override
155 {
156 162874 stop_cb.reset();
157
158
1/2
✓ Branch 0 taken 162874 times.
✗ Branch 1 not taken.
162874 if (ec_out)
159 {
160
2/2
✓ Branch 1 taken 201 times.
✓ Branch 2 taken 162673 times.
162874 if (cancelled.load(std::memory_order_acquire))
161 201 *ec_out = capy::error::canceled;
162
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 162672 times.
162673 else if (errn != 0)
163 1 *ec_out = make_err(errn);
164
6/6
✓ Branch 1 taken 81299 times.
✓ Branch 2 taken 81373 times.
✓ Branch 3 taken 5 times.
✓ Branch 4 taken 81294 times.
✓ Branch 5 taken 5 times.
✓ Branch 6 taken 162667 times.
162672 else if (is_read_operation() && bytes_transferred == 0)
165 5 *ec_out = capy::error::eof;
166 else
167 162667 *ec_out = {};
168 }
169
170
1/2
✓ Branch 0 taken 162874 times.
✗ Branch 1 not taken.
162874 if (bytes_out)
171 162874 *bytes_out = bytes_transferred;
172
173 // Move to stack before destroying the frame
174 162874 capy::executor_ref saved_ex( std::move( ex ) );
175 162874 capy::coro saved_h( std::move( h ) );
176 162874 impl_ptr.reset();
177
1/1
✓ Branch 1 taken 162874 times.
162874 saved_ex.dispatch( saved_h );
178 162874 }
179
180 81372 virtual bool is_read_operation() const noexcept { return false; }
181 virtual void cancel() noexcept = 0;
182
183 void destroy() override
184 {
185 stop_cb.reset();
186 impl_ptr.reset();
187 }
188
189 20159 void request_cancel() noexcept
190 {
191 20159 cancelled.store(true, std::memory_order_release);
192 20159 }
193
194 void start(std::stop_token token)
195 {
196 cancelled.store(false, std::memory_order_release);
197 stop_cb.reset();
198 socket_impl_ = nullptr;
199 acceptor_impl_ = nullptr;
200
201 if (token.stop_possible())
202 stop_cb.emplace(token, canceller{this});
203 }
204
205 165054 void start(std::stop_token token, select_socket_impl* impl)
206 {
207 165054 cancelled.store(false, std::memory_order_release);
208 165054 stop_cb.reset();
209 165054 socket_impl_ = impl;
210 165054 acceptor_impl_ = nullptr;
211
212
2/2
✓ Branch 1 taken 99 times.
✓ Branch 2 taken 164955 times.
165054 if (token.stop_possible())
213 99 stop_cb.emplace(token, canceller{this});
214 165054 }
215
216 2181 void start(std::stop_token token, select_acceptor_impl* impl)
217 {
218 2181 cancelled.store(false, std::memory_order_release);
219 2181 stop_cb.reset();
220 2181 socket_impl_ = nullptr;
221 2181 acceptor_impl_ = impl;
222
223
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2181 times.
2181 if (token.stop_possible())
224 stop_cb.emplace(token, canceller{this});
225 2181 }
226
227 167116 void complete(int err, std::size_t bytes) noexcept
228 {
229 167116 errn = err;
230 167116 bytes_transferred = bytes;
231 167116 }
232
233 virtual void perform_io() noexcept {}
234 };
235
236
237 struct select_connect_op : select_op
238 {
239 endpoint target_endpoint;
240
241 2180 void reset() noexcept
242 {
243 2180 select_op::reset();
244 2180 target_endpoint = endpoint{};
245 2180 }
246
247 2180 void perform_io() noexcept override
248 {
249 // connect() completion status is retrieved via SO_ERROR, not return value
250 2180 int err = 0;
251 2180 socklen_t len = sizeof(err);
252
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2180 times.
2180 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
253 err = errno;
254 2180 complete(err, 0);
255 2180 }
256
257 // Defined in sockets.cpp where select_socket_impl is complete
258 void operator()() override;
259 void cancel() noexcept override;
260 };
261
262
263 struct select_read_op : select_op
264 {
265 static constexpr std::size_t max_buffers = 16;
266 iovec iovecs[max_buffers];
267 int iovec_count = 0;
268 bool empty_buffer_read = false;
269
270 81300 bool is_read_operation() const noexcept override
271 {
272 81300 return !empty_buffer_read;
273 }
274
275 81497 void reset() noexcept
276 {
277 81497 select_op::reset();
278 81497 iovec_count = 0;
279 81497 empty_buffer_read = false;
280 81497 }
281
282 48 void perform_io() noexcept override
283 {
284 48 ssize_t n = ::readv(fd, iovecs, iovec_count);
285
1/2
✓ Branch 0 taken 48 times.
✗ Branch 1 not taken.
48 if (n >= 0)
286 48 complete(0, static_cast<std::size_t>(n));
287 else
288 complete(errno, 0);
289 48 }
290
291 void cancel() noexcept override;
292 };
293
294
295 struct select_write_op : select_op
296 {
297 static constexpr std::size_t max_buffers = 16;
298 iovec iovecs[max_buffers];
299 int iovec_count = 0;
300
301 81377 void reset() noexcept
302 {
303 81377 select_op::reset();
304 81377 iovec_count = 0;
305 81377 }
306
307 void perform_io() noexcept override
308 {
309 msghdr msg{};
310 msg.msg_iov = iovecs;
311 msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
312
313 ssize_t n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
314 if (n >= 0)
315 complete(0, static_cast<std::size_t>(n));
316 else
317 complete(errno, 0);
318 }
319
320 void cancel() noexcept override;
321 };
322
323
324 struct select_accept_op : select_op
325 {
326 int accepted_fd = -1;
327 io_object::io_object_impl* peer_impl = nullptr;
328 io_object::io_object_impl** impl_out = nullptr;
329
330 2181 void reset() noexcept
331 {
332 2181 select_op::reset();
333 2181 accepted_fd = -1;
334 2181 peer_impl = nullptr;
335 2181 impl_out = nullptr;
336 2181 }
337
338 2176 void perform_io() noexcept override
339 {
340 2176 sockaddr_in addr{};
341 2176 socklen_t addrlen = sizeof(addr);
342
343 // Note: select backend uses accept() + fcntl instead of accept4()
344 // for broader POSIX compatibility
345 2176 int new_fd = ::accept(fd, reinterpret_cast<sockaddr*>(&addr), &addrlen);
346
347
1/2
✓ Branch 0 taken 2176 times.
✗ Branch 1 not taken.
2176 if (new_fd >= 0)
348 {
349 // Reject fds that exceed select()'s FD_SETSIZE limit.
350 // Better to fail now than during later async operations.
351
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2176 times.
2176 if (new_fd >= FD_SETSIZE)
352 {
353 ::close(new_fd);
354 complete(EINVAL, 0);
355 return;
356 }
357
358 // Set non-blocking and close-on-exec flags.
359 // A non-blocking socket is essential for the async reactor;
360 // if we can't configure it, fail rather than risk blocking.
361 2176 int flags = ::fcntl(new_fd, F_GETFL, 0);
362
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2176 times.
2176 if (flags == -1)
363 {
364 int err = errno;
365 ::close(new_fd);
366 complete(err, 0);
367 return;
368 }
369
370
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2176 times.
2176 if (::fcntl(new_fd, F_SETFL, flags | O_NONBLOCK) == -1)
371 {
372 int err = errno;
373 ::close(new_fd);
374 complete(err, 0);
375 return;
376 }
377
378
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2176 times.
2176 if (::fcntl(new_fd, F_SETFD, FD_CLOEXEC) == -1)
379 {
380 int err = errno;
381 ::close(new_fd);
382 complete(err, 0);
383 return;
384 }
385
386 2176 accepted_fd = new_fd;
387 2176 complete(0, 0);
388 }
389 else
390 {
391 complete(errno, 0);
392 }
393 }
394
395 // Defined in acceptors.cpp where select_acceptor_impl is complete
396 void operator()() override;
397 void cancel() noexcept override;
398 };
399
400 } // namespace boost::corosio::detail
401
402 #endif // BOOST_COROSIO_HAS_SELECT
403
404 #endif // BOOST_COROSIO_DETAIL_SELECT_OP_HPP
405