Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
11 : #define BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/corosio/io_object.hpp>
19 : #include <boost/corosio/endpoint.hpp>
20 : #include <boost/capy/ex/executor_ref.hpp>
21 : #include <boost/capy/coro.hpp>
22 : #include <boost/capy/error.hpp>
23 : #include <system_error>
24 :
25 : #include "src/detail/make_err.hpp"
26 : #include "src/detail/resume_coro.hpp"
27 : #include "src/detail/scheduler_op.hpp"
28 : #include "src/detail/endpoint_convert.hpp"
29 :
30 : #include <unistd.h>
31 : #include <errno.h>
32 :
33 : #include <atomic>
34 : #include <cstddef>
35 : #include <memory>
36 : #include <optional>
37 : #include <stop_token>
38 :
39 : #include <netinet/in.h>
40 : #include <sys/socket.h>
41 : #include <sys/uio.h>
42 :
43 : /*
44 : epoll Operation State
45 : =====================
46 :
47 : Each async I/O operation has a corresponding epoll_op-derived struct that
48 : holds the operation's state while it's in flight. The socket impl owns
49 : fixed slots for each operation type (conn_, rd_, wr_), so only one
50 : operation of each type can be pending per socket at a time.
51 :
52 : Persistent Registration
53 : -----------------------
54 : File descriptors are registered with epoll once (via descriptor_data) and
55 : stay registered until closed. The descriptor_data tracks which operations
56 : are pending (read_op, write_op, connect_op). When an event arrives, the
57 : reactor dispatches to the appropriate pending operation.
58 :
59 : Impl Lifetime Management
60 : ------------------------
61 : When cancel() posts an op to the scheduler's ready queue, the socket impl
62 : might be destroyed before the scheduler processes the op. The `impl_ptr`
63 : member holds a shared_ptr to the impl, keeping it alive until the op
64 : completes. This is set by cancel() and cleared in operator() after the
65 : coroutine is resumed.
66 :
67 : EOF Detection
68 : -------------
69 : For reads, 0 bytes with no error means EOF. But an empty user buffer also
70 : returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
71 :
72 : SIGPIPE Prevention
73 : ------------------
74 : Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
75 : SIGPIPE when the peer has closed.
76 : */
77 :
78 : namespace boost::corosio::detail {
79 :
80 : // Forward declarations
81 : class epoll_socket_impl;
82 : class epoll_acceptor_impl;
83 : struct epoll_op;
84 :
85 : /** Per-descriptor state for persistent epoll registration.
86 :
87 : Tracks pending operations for a file descriptor. The fd is registered
88 : once with epoll and stays registered until closed. Events are dispatched
89 : to the appropriate pending operation (EPOLLIN -> read_op, etc.).
90 :
91 : With edge-triggered epoll (EPOLLET), atomic operations are required to
92 : synchronize between operation registration and reactor event delivery.
93 : The read_ready/write_ready flags cache edge events that arrived before
94 : an operation was registered.
95 : */
96 : struct descriptor_data
97 : {
98 : /// Currently registered events (EPOLLIN, EPOLLOUT, etc.)
99 : std::uint32_t registered_events = 0;
100 :
101 : /// Pending read operation (nullptr if none)
102 : std::atomic<epoll_op*> read_op{nullptr};
103 :
104 : /// Pending write operation (nullptr if none)
105 : std::atomic<epoll_op*> write_op{nullptr};
106 :
107 : /// Pending connect operation (nullptr if none)
108 : std::atomic<epoll_op*> connect_op{nullptr};
109 :
110 : /// Cached read readiness (edge event arrived before op registered)
111 : std::atomic<bool> read_ready{false};
112 :
113 : /// Cached write readiness (edge event arrived before op registered)
114 : std::atomic<bool> write_ready{false};
115 :
116 : /// The file descriptor
117 : int fd = -1;
118 :
119 : /// Whether this descriptor is managed by persistent registration
120 : bool is_registered = false;
121 : };
122 :
123 : struct epoll_op : scheduler_op
124 : {
125 : struct canceller
126 : {
127 : epoll_op* op;
128 : void operator()() const noexcept;
129 : };
130 :
131 : capy::coro h;
132 : capy::executor_ref ex;
133 : std::error_code* ec_out = nullptr;
134 : std::size_t* bytes_out = nullptr;
135 :
136 : int fd = -1;
137 : int errn = 0;
138 : std::size_t bytes_transferred = 0;
139 :
140 : std::atomic<bool> cancelled{false};
141 : std::optional<std::stop_callback<canceller>> stop_cb;
142 :
143 : // Prevents use-after-free when socket is closed with pending ops.
144 : // See "Impl Lifetime Management" in file header.
145 : std::shared_ptr<void> impl_ptr;
146 :
147 : // For stop_token cancellation - pointer to owning socket/acceptor impl.
148 : // When stop is requested, we call back to the impl to perform actual I/O cancellation.
149 : epoll_socket_impl* socket_impl_ = nullptr;
150 : epoll_acceptor_impl* acceptor_impl_ = nullptr;
151 :
152 16383 : epoll_op()
153 16383 : {
154 16383 : data_ = this;
155 16383 : }
156 :
157 172763 : void reset() noexcept
158 : {
159 172763 : fd = -1;
160 172763 : errn = 0;
161 172763 : bytes_transferred = 0;
162 172763 : cancelled.store(false, std::memory_order_relaxed);
163 172763 : impl_ptr.reset();
164 172763 : socket_impl_ = nullptr;
165 172763 : acceptor_impl_ = nullptr;
166 172763 : }
167 :
168 167326 : void operator()() override
169 : {
170 167326 : stop_cb.reset();
171 :
172 167326 : if (ec_out)
173 : {
174 167326 : if (cancelled.load(std::memory_order_acquire))
175 205 : *ec_out = capy::error::canceled;
176 167121 : else if (errn != 0)
177 1 : *ec_out = make_err(errn);
178 167120 : else if (is_read_operation() && bytes_transferred == 0)
179 5 : *ec_out = capy::error::eof;
180 : else
181 167115 : *ec_out = {};
182 : }
183 :
184 167326 : if (bytes_out)
185 167326 : *bytes_out = bytes_transferred;
186 :
187 : // Move to stack before resuming coroutine. The coroutine might close
188 : // the socket, releasing the last wrapper ref. If impl_ptr were the
189 : // last ref and we destroyed it while still in operator(), we'd have
190 : // use-after-free. Moving to local ensures destruction happens at
191 : // function exit, after all member accesses are complete.
192 167326 : capy::executor_ref saved_ex( std::move( ex ) );
193 167326 : capy::coro saved_h( std::move( h ) );
194 167326 : auto prevent_premature_destruction = std::move(impl_ptr);
195 167326 : resume_coro(saved_ex, saved_h);
196 167326 : }
197 :
198 83596 : virtual bool is_read_operation() const noexcept { return false; }
199 : virtual void cancel() noexcept = 0;
200 :
201 0 : void destroy() override
202 : {
203 0 : stop_cb.reset();
204 0 : impl_ptr.reset();
205 0 : }
206 :
207 25022 : void request_cancel() noexcept
208 : {
209 25022 : cancelled.store(true, std::memory_order_release);
210 25022 : }
211 :
212 : void start(std::stop_token token)
213 : {
214 : cancelled.store(false, std::memory_order_release);
215 : stop_cb.reset();
216 : socket_impl_ = nullptr;
217 : acceptor_impl_ = nullptr;
218 :
219 : if (token.stop_possible())
220 : stop_cb.emplace(token, canceller{this});
221 : }
222 :
223 170041 : void start(std::stop_token token, epoll_socket_impl* impl)
224 : {
225 170041 : cancelled.store(false, std::memory_order_release);
226 170041 : stop_cb.reset();
227 170041 : socket_impl_ = impl;
228 170041 : acceptor_impl_ = nullptr;
229 :
230 170041 : if (token.stop_possible())
231 106 : stop_cb.emplace(token, canceller{this});
232 170041 : }
233 :
234 2722 : void start(std::stop_token token, epoll_acceptor_impl* impl)
235 : {
236 2722 : cancelled.store(false, std::memory_order_release);
237 2722 : stop_cb.reset();
238 2722 : socket_impl_ = nullptr;
239 2722 : acceptor_impl_ = impl;
240 :
241 2722 : if (token.stop_possible())
242 9 : stop_cb.emplace(token, canceller{this});
243 2722 : }
244 :
245 172636 : void complete(int err, std::size_t bytes) noexcept
246 : {
247 172636 : errn = err;
248 172636 : bytes_transferred = bytes;
249 172636 : }
250 :
251 0 : virtual void perform_io() noexcept {}
252 : };
253 :
254 :
255 : struct epoll_connect_op : epoll_op
256 : {
257 : endpoint target_endpoint;
258 :
259 2715 : void reset() noexcept
260 : {
261 2715 : epoll_op::reset();
262 2715 : target_endpoint = endpoint{};
263 2715 : }
264 :
265 2714 : void perform_io() noexcept override
266 : {
267 : // connect() completion status is retrieved via SO_ERROR, not return value
268 2714 : int err = 0;
269 2714 : socklen_t len = sizeof(err);
270 2714 : if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
271 0 : err = errno;
272 2714 : complete(err, 0);
273 2714 : }
274 :
275 : // Defined in sockets.cpp where epoll_socket_impl is complete
276 : void operator()() override;
277 : void cancel() noexcept override;
278 : };
279 :
280 :
281 : struct epoll_read_op : epoll_op
282 : {
283 : static constexpr std::size_t max_buffers = 16;
284 : iovec iovecs[max_buffers];
285 : int iovec_count = 0;
286 : bool empty_buffer_read = false;
287 :
288 83524 : bool is_read_operation() const noexcept override
289 : {
290 83524 : return !empty_buffer_read;
291 : }
292 :
293 83724 : void reset() noexcept
294 : {
295 83724 : epoll_op::reset();
296 83724 : iovec_count = 0;
297 83724 : empty_buffer_read = false;
298 83724 : }
299 :
300 51 : void perform_io() noexcept override
301 : {
302 51 : ssize_t n = ::readv(fd, iovecs, iovec_count);
303 51 : if (n >= 0)
304 51 : complete(0, static_cast<std::size_t>(n));
305 : else
306 0 : complete(errno, 0);
307 51 : }
308 :
309 : void cancel() noexcept override;
310 : };
311 :
312 :
313 : struct epoll_write_op : epoll_op
314 : {
315 : static constexpr std::size_t max_buffers = 16;
316 : iovec iovecs[max_buffers];
317 : int iovec_count = 0;
318 :
319 83602 : void reset() noexcept
320 : {
321 83602 : epoll_op::reset();
322 83602 : iovec_count = 0;
323 83602 : }
324 :
325 0 : void perform_io() noexcept override
326 : {
327 0 : msghdr msg{};
328 0 : msg.msg_iov = iovecs;
329 0 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
330 :
331 0 : ssize_t n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
332 0 : if (n >= 0)
333 0 : complete(0, static_cast<std::size_t>(n));
334 : else
335 0 : complete(errno, 0);
336 0 : }
337 :
338 : void cancel() noexcept override;
339 : };
340 :
341 :
342 : struct epoll_accept_op : epoll_op
343 : {
344 : int accepted_fd = -1;
345 : io_object::io_object_impl* peer_impl = nullptr;
346 : io_object::io_object_impl** impl_out = nullptr;
347 :
348 2722 : void reset() noexcept
349 : {
350 2722 : epoll_op::reset();
351 2722 : accepted_fd = -1;
352 2722 : peer_impl = nullptr;
353 2722 : impl_out = nullptr;
354 2722 : }
355 :
356 2711 : void perform_io() noexcept override
357 : {
358 2711 : sockaddr_in addr{};
359 2711 : socklen_t addrlen = sizeof(addr);
360 2711 : int new_fd = ::accept4(fd, reinterpret_cast<sockaddr*>(&addr),
361 : &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
362 :
363 2711 : if (new_fd >= 0)
364 : {
365 2711 : accepted_fd = new_fd;
366 2711 : complete(0, 0);
367 : }
368 : else
369 : {
370 0 : complete(errno, 0);
371 : }
372 2711 : }
373 :
374 : // Defined in acceptors.cpp where epoll_acceptor_impl is complete
375 : void operator()() override;
376 : void cancel() noexcept override;
377 : };
378 :
379 : } // namespace boost::corosio::detail
380 :
381 : #endif // BOOST_COROSIO_HAS_EPOLL
382 :
383 : #endif // BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
|