libs/corosio/src/corosio/src/detail/epoll/op.hpp

84.7% Lines (94/111) 84.2% Functions (16/19) 74.1% Branches (20/27)
libs/corosio/src/corosio/src/detail/epoll/op.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
11 #define BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/io_object.hpp>
19 #include <boost/corosio/endpoint.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <boost/capy/coro.hpp>
22 #include <boost/capy/error.hpp>
23 #include <system_error>
24
25 #include "src/detail/make_err.hpp"
26 #include "src/detail/resume_coro.hpp"
27 #include "src/detail/scheduler_op.hpp"
28 #include "src/detail/endpoint_convert.hpp"
29
30 #include <unistd.h>
31 #include <errno.h>
32
33 #include <atomic>
34 #include <cstddef>
35 #include <memory>
36 #include <optional>
37 #include <stop_token>
38
39 #include <netinet/in.h>
40 #include <sys/socket.h>
41 #include <sys/uio.h>
42
43 /*
44 epoll Operation State
45 =====================
46
47 Each async I/O operation has a corresponding epoll_op-derived struct that
48 holds the operation's state while it's in flight. The socket impl owns
49 fixed slots for each operation type (conn_, rd_, wr_), so only one
50 operation of each type can be pending per socket at a time.
51
52 Persistent Registration
53 -----------------------
54 File descriptors are registered with epoll once (via descriptor_data) and
55 stay registered until closed. The descriptor_data tracks which operations
56 are pending (read_op, write_op, connect_op). When an event arrives, the
57 reactor dispatches to the appropriate pending operation.
58
59 Impl Lifetime Management
60 ------------------------
61 When cancel() posts an op to the scheduler's ready queue, the socket impl
62 might be destroyed before the scheduler processes the op. The `impl_ptr`
63 member holds a shared_ptr to the impl, keeping it alive until the op
64 completes. This is set by cancel() and cleared in operator() after the
65 coroutine is resumed.
66
67 EOF Detection
68 -------------
69 For reads, 0 bytes with no error means EOF. But an empty user buffer also
70 returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
71
72 SIGPIPE Prevention
73 ------------------
74 Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
75 SIGPIPE when the peer has closed.
76 */
77
78 namespace boost::corosio::detail {
79
80 // Forward declarations
81 class epoll_socket_impl;
82 class epoll_acceptor_impl;
83 struct epoll_op;
84
85 /** Per-descriptor state for persistent epoll registration.
86
87 Tracks pending operations for a file descriptor. The fd is registered
88 once with epoll and stays registered until closed. Events are dispatched
89 to the appropriate pending operation (EPOLLIN -> read_op, etc.).
90
91 With edge-triggered epoll (EPOLLET), atomic operations are required to
92 synchronize between operation registration and reactor event delivery.
93 The read_ready/write_ready flags cache edge events that arrived before
94 an operation was registered.
95 */
96 struct descriptor_data
97 {
98 /// Currently registered events (EPOLLIN, EPOLLOUT, etc.)
99 std::uint32_t registered_events = 0;
100
101 /// Pending read operation (nullptr if none)
102 std::atomic<epoll_op*> read_op{nullptr};
103
104 /// Pending write operation (nullptr if none)
105 std::atomic<epoll_op*> write_op{nullptr};
106
107 /// Pending connect operation (nullptr if none)
108 std::atomic<epoll_op*> connect_op{nullptr};
109
110 /// Cached read readiness (edge event arrived before op registered)
111 std::atomic<bool> read_ready{false};
112
113 /// Cached write readiness (edge event arrived before op registered)
114 std::atomic<bool> write_ready{false};
115
116 /// The file descriptor
117 int fd = -1;
118
119 /// Whether this descriptor is managed by persistent registration
120 bool is_registered = false;
121 };
122
123 struct epoll_op : scheduler_op
124 {
125 struct canceller
126 {
127 epoll_op* op;
128 void operator()() const noexcept;
129 };
130
131 capy::coro h;
132 capy::executor_ref ex;
133 std::error_code* ec_out = nullptr;
134 std::size_t* bytes_out = nullptr;
135
136 int fd = -1;
137 int errn = 0;
138 std::size_t bytes_transferred = 0;
139
140 std::atomic<bool> cancelled{false};
141 std::optional<std::stop_callback<canceller>> stop_cb;
142
143 // Prevents use-after-free when socket is closed with pending ops.
144 // See "Impl Lifetime Management" in file header.
145 std::shared_ptr<void> impl_ptr;
146
147 // For stop_token cancellation - pointer to owning socket/acceptor impl.
148 // When stop is requested, we call back to the impl to perform actual I/O cancellation.
149 epoll_socket_impl* socket_impl_ = nullptr;
150 epoll_acceptor_impl* acceptor_impl_ = nullptr;
151
152 16383 epoll_op()
153 16383 {
154 16383 data_ = this;
155 16383 }
156
157 172763 void reset() noexcept
158 {
159 172763 fd = -1;
160 172763 errn = 0;
161 172763 bytes_transferred = 0;
162 172763 cancelled.store(false, std::memory_order_relaxed);
163 172763 impl_ptr.reset();
164 172763 socket_impl_ = nullptr;
165 172763 acceptor_impl_ = nullptr;
166 172763 }
167
168 167326 void operator()() override
169 {
170 167326 stop_cb.reset();
171
172
1/2
✓ Branch 0 taken 167326 times.
✗ Branch 1 not taken.
167326 if (ec_out)
173 {
174
2/2
✓ Branch 1 taken 205 times.
✓ Branch 2 taken 167121 times.
167326 if (cancelled.load(std::memory_order_acquire))
175 205 *ec_out = capy::error::canceled;
176
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 167120 times.
167121 else if (errn != 0)
177 1 *ec_out = make_err(errn);
178
6/6
✓ Branch 1 taken 83523 times.
✓ Branch 2 taken 83597 times.
✓ Branch 3 taken 5 times.
✓ Branch 4 taken 83518 times.
✓ Branch 5 taken 5 times.
✓ Branch 6 taken 167115 times.
167120 else if (is_read_operation() && bytes_transferred == 0)
179 5 *ec_out = capy::error::eof;
180 else
181 167115 *ec_out = {};
182 }
183
184
1/2
✓ Branch 0 taken 167326 times.
✗ Branch 1 not taken.
167326 if (bytes_out)
185 167326 *bytes_out = bytes_transferred;
186
187 // Move to stack before resuming coroutine. The coroutine might close
188 // the socket, releasing the last wrapper ref. If impl_ptr were the
189 // last ref and we destroyed it while still in operator(), we'd have
190 // use-after-free. Moving to local ensures destruction happens at
191 // function exit, after all member accesses are complete.
192 167326 capy::executor_ref saved_ex( std::move( ex ) );
193 167326 capy::coro saved_h( std::move( h ) );
194 167326 auto prevent_premature_destruction = std::move(impl_ptr);
195
1/1
✓ Branch 1 taken 167326 times.
167326 resume_coro(saved_ex, saved_h);
196 167326 }
197
198 83596 virtual bool is_read_operation() const noexcept { return false; }
199 virtual void cancel() noexcept = 0;
200
201 void destroy() override
202 {
203 stop_cb.reset();
204 impl_ptr.reset();
205 }
206
207 25022 void request_cancel() noexcept
208 {
209 25022 cancelled.store(true, std::memory_order_release);
210 25022 }
211
212 void start(std::stop_token token)
213 {
214 cancelled.store(false, std::memory_order_release);
215 stop_cb.reset();
216 socket_impl_ = nullptr;
217 acceptor_impl_ = nullptr;
218
219 if (token.stop_possible())
220 stop_cb.emplace(token, canceller{this});
221 }
222
223 170041 void start(std::stop_token token, epoll_socket_impl* impl)
224 {
225 170041 cancelled.store(false, std::memory_order_release);
226 170041 stop_cb.reset();
227 170041 socket_impl_ = impl;
228 170041 acceptor_impl_ = nullptr;
229
230
2/2
✓ Branch 1 taken 106 times.
✓ Branch 2 taken 169935 times.
170041 if (token.stop_possible())
231 106 stop_cb.emplace(token, canceller{this});
232 170041 }
233
234 2722 void start(std::stop_token token, epoll_acceptor_impl* impl)
235 {
236 2722 cancelled.store(false, std::memory_order_release);
237 2722 stop_cb.reset();
238 2722 socket_impl_ = nullptr;
239 2722 acceptor_impl_ = impl;
240
241
2/2
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 2713 times.
2722 if (token.stop_possible())
242 9 stop_cb.emplace(token, canceller{this});
243 2722 }
244
245 172636 void complete(int err, std::size_t bytes) noexcept
246 {
247 172636 errn = err;
248 172636 bytes_transferred = bytes;
249 172636 }
250
251 virtual void perform_io() noexcept {}
252 };
253
254
255 struct epoll_connect_op : epoll_op
256 {
257 endpoint target_endpoint;
258
259 2715 void reset() noexcept
260 {
261 2715 epoll_op::reset();
262 2715 target_endpoint = endpoint{};
263 2715 }
264
265 2714 void perform_io() noexcept override
266 {
267 // connect() completion status is retrieved via SO_ERROR, not return value
268 2714 int err = 0;
269 2714 socklen_t len = sizeof(err);
270
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2714 times.
2714 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
271 err = errno;
272 2714 complete(err, 0);
273 2714 }
274
275 // Defined in sockets.cpp where epoll_socket_impl is complete
276 void operator()() override;
277 void cancel() noexcept override;
278 };
279
280
281 struct epoll_read_op : epoll_op
282 {
283 static constexpr std::size_t max_buffers = 16;
284 iovec iovecs[max_buffers];
285 int iovec_count = 0;
286 bool empty_buffer_read = false;
287
288 83524 bool is_read_operation() const noexcept override
289 {
290 83524 return !empty_buffer_read;
291 }
292
293 83724 void reset() noexcept
294 {
295 83724 epoll_op::reset();
296 83724 iovec_count = 0;
297 83724 empty_buffer_read = false;
298 83724 }
299
300 51 void perform_io() noexcept override
301 {
302 51 ssize_t n = ::readv(fd, iovecs, iovec_count);
303
1/2
✓ Branch 0 taken 51 times.
✗ Branch 1 not taken.
51 if (n >= 0)
304 51 complete(0, static_cast<std::size_t>(n));
305 else
306 complete(errno, 0);
307 51 }
308
309 void cancel() noexcept override;
310 };
311
312
313 struct epoll_write_op : epoll_op
314 {
315 static constexpr std::size_t max_buffers = 16;
316 iovec iovecs[max_buffers];
317 int iovec_count = 0;
318
319 83602 void reset() noexcept
320 {
321 83602 epoll_op::reset();
322 83602 iovec_count = 0;
323 83602 }
324
325 void perform_io() noexcept override
326 {
327 msghdr msg{};
328 msg.msg_iov = iovecs;
329 msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
330
331 ssize_t n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
332 if (n >= 0)
333 complete(0, static_cast<std::size_t>(n));
334 else
335 complete(errno, 0);
336 }
337
338 void cancel() noexcept override;
339 };
340
341
342 struct epoll_accept_op : epoll_op
343 {
344 int accepted_fd = -1;
345 io_object::io_object_impl* peer_impl = nullptr;
346 io_object::io_object_impl** impl_out = nullptr;
347
348 2722 void reset() noexcept
349 {
350 2722 epoll_op::reset();
351 2722 accepted_fd = -1;
352 2722 peer_impl = nullptr;
353 2722 impl_out = nullptr;
354 2722 }
355
356 2711 void perform_io() noexcept override
357 {
358 2711 sockaddr_in addr{};
359 2711 socklen_t addrlen = sizeof(addr);
360 2711 int new_fd = ::accept4(fd, reinterpret_cast<sockaddr*>(&addr),
361 &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
362
363
1/2
✓ Branch 0 taken 2711 times.
✗ Branch 1 not taken.
2711 if (new_fd >= 0)
364 {
365 2711 accepted_fd = new_fd;
366 2711 complete(0, 0);
367 }
368 else
369 {
370 complete(errno, 0);
371 }
372 2711 }
373
374 // Defined in acceptors.cpp where epoll_acceptor_impl is complete
375 void operator()() override;
376 void cancel() noexcept override;
377 };
378
379 } // namespace boost::corosio::detail
380
381 #endif // BOOST_COROSIO_HAS_EPOLL
382
383 #endif // BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
384