Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_DETAIL_EPOLL_SOCKETS_HPP
11 : #define BOOST_COROSIO_DETAIL_EPOLL_SOCKETS_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/corosio/tcp_socket.hpp>
19 : #include <boost/capy/ex/executor_ref.hpp>
20 : #include <boost/capy/ex/execution_context.hpp>
21 : #include "src/detail/intrusive.hpp"
22 : #include "src/detail/socket_service.hpp"
23 :
24 : #include "src/detail/epoll/op.hpp"
25 : #include "src/detail/epoll/scheduler.hpp"
26 :
27 : #include <coroutine>
28 : #include <memory>
29 : #include <mutex>
30 : #include <unordered_map>
31 :
32 : /*
33 : epoll Socket Implementation
34 : ===========================
35 :
36 : Each I/O operation follows the same pattern:
37 : 1. Try the syscall immediately (non-blocking socket)
38 : 2. If it succeeds or fails with a real error, post to completion queue
39 : 3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
40 :
41 : This "try first" approach avoids unnecessary epoll round-trips for
42 : operations that can complete immediately (common for small reads/writes
43 : on fast local connections).
44 :
45 : One-Shot Registration
46 : ---------------------
47 : We use one-shot epoll registration: each operation registers, waits for
48 : one event, then unregisters. This simplifies the state machine since we
49 : don't need to track whether an fd is currently registered or handle
50 : re-arming. The tradeoff is slightly more epoll_ctl calls, but the
51 : simplicity is worth it.
52 :
53 : Cancellation
54 : ------------
55 : See op.hpp for the completion/cancellation race handling via the
56 : `registered` atomic. cancel() must complete pending operations (post
57 : them with cancelled flag) so coroutines waiting on them can resume.
58 : close_socket() calls cancel() first to ensure this.
59 :
60 : Impl Lifetime with shared_ptr
61 : -----------------------------
62 : Socket impls use enable_shared_from_this. The service owns impls via
63 : shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
64 : removal. When a user calls close(), we call cancel() which posts pending
65 : ops to the scheduler.
66 :
67 : CRITICAL: The posted ops must keep the impl alive until they complete.
68 : Otherwise the scheduler would process a freed op (use-after-free). The
69 : cancel() method captures shared_from_this() into op.impl_ptr before
70 : posting. When the op completes, impl_ptr is cleared, allowing the impl
71 : to be destroyed if no other references exist.
72 :
73 : Service Ownership
74 : -----------------
75 : epoll_socket_service owns all socket impls. destroy_impl() removes the
76 : shared_ptr from the map, but the impl may survive if ops still hold
77 : impl_ptr refs. shutdown() closes all sockets and clears the map; any
78 : in-flight ops will complete and release their refs.
79 : */
80 :
81 : namespace boost::corosio::detail {
82 :
83 : class epoll_socket_service;
84 : class epoll_socket_impl;
85 :
86 : /** Initiator coroutine for read operations.
87 :
88 : This coroutine receives control via symmetric transfer after the caller
89 : has fully suspended, then initiates the actual I/O. Uses cached frame
90 : allocation to avoid per-operation heap allocations.
91 : */
92 : struct read_initiator
93 : {
94 : struct promise_type
95 : {
96 : epoll_socket_impl* impl;
97 :
98 : /// Reuse cached frame to avoid per-operation heap allocation.
99 83723 : static void* operator new(std::size_t n, void*& cached, epoll_socket_impl*)
100 : {
101 83723 : if (!cached)
102 40 : cached = ::operator new(n);
103 83723 : return cached;
104 : }
105 :
106 : /// No-op - frame memory freed in socket destructor.
107 83723 : static void operator delete(void*) noexcept {}
108 :
109 83723 : std::suspend_always initial_suspend() noexcept { return {}; }
110 83723 : std::suspend_always final_suspend() noexcept { return {}; }
111 :
112 83723 : read_initiator get_return_object()
113 : {
114 83723 : return {std::coroutine_handle<promise_type>::from_promise(*this)};
115 : }
116 :
117 83723 : void return_void() {}
118 0 : void unhandled_exception() { std::terminate(); }
119 : };
120 :
121 : using handle_type = std::coroutine_handle<promise_type>;
122 : handle_type h;
123 : };
124 :
125 : /** Initiator coroutine for write operations.
126 :
127 : This coroutine receives control via symmetric transfer after the caller
128 : has fully suspended, then initiates the actual I/O. Uses cached frame
129 : allocation to avoid per-operation heap allocations.
130 : */
131 : struct write_initiator
132 : {
133 : struct promise_type
134 : {
135 : epoll_socket_impl* impl;
136 :
137 : /// Reuse cached frame to avoid per-operation heap allocation.
138 83601 : static void* operator new(std::size_t n, void*& cached, epoll_socket_impl*)
139 : {
140 83601 : if (!cached)
141 38 : cached = ::operator new(n);
142 83601 : return cached;
143 : }
144 :
145 : /// No-op - frame memory freed in socket destructor.
146 83601 : static void operator delete(void*) noexcept {}
147 :
148 83601 : std::suspend_always initial_suspend() noexcept { return {}; }
149 83601 : std::suspend_always final_suspend() noexcept { return {}; }
150 :
151 83601 : write_initiator get_return_object()
152 : {
153 83601 : return {std::coroutine_handle<promise_type>::from_promise(*this)};
154 : }
155 :
156 83601 : void return_void() {}
157 0 : void unhandled_exception() { std::terminate(); }
158 : };
159 :
160 : using handle_type = std::coroutine_handle<promise_type>;
161 : handle_type h;
162 : };
163 :
164 : // Coroutine factory functions (defined in sockets.cpp)
165 : read_initiator make_read_initiator(void*& cached, epoll_socket_impl* impl);
166 : write_initiator make_write_initiator(void*& cached, epoll_socket_impl* impl);
167 :
168 : /// Socket implementation for epoll backend.
169 : class epoll_socket_impl
170 : : public tcp_socket::socket_impl
171 : , public std::enable_shared_from_this<epoll_socket_impl>
172 : , public intrusive_list<epoll_socket_impl>::node
173 : {
174 : friend class epoll_socket_service;
175 :
176 : public:
177 : explicit epoll_socket_impl(epoll_socket_service& svc) noexcept;
178 : ~epoll_socket_impl();
179 :
180 : void release() override;
181 :
182 : void connect(
183 : std::coroutine_handle<>,
184 : capy::executor_ref,
185 : endpoint,
186 : std::stop_token,
187 : std::error_code*) override;
188 :
189 : std::coroutine_handle<> read_some(
190 : std::coroutine_handle<>,
191 : capy::executor_ref,
192 : io_buffer_param,
193 : std::stop_token,
194 : std::error_code*,
195 : std::size_t*) override;
196 :
197 : std::coroutine_handle<> write_some(
198 : std::coroutine_handle<>,
199 : capy::executor_ref,
200 : io_buffer_param,
201 : std::stop_token,
202 : std::error_code*,
203 : std::size_t*) override;
204 :
205 : std::error_code shutdown(tcp_socket::shutdown_type what) noexcept override;
206 :
207 0 : native_handle_type native_handle() const noexcept override { return fd_; }
208 :
209 : // Socket options
210 : std::error_code set_no_delay(bool value) noexcept override;
211 : bool no_delay(std::error_code& ec) const noexcept override;
212 :
213 : std::error_code set_keep_alive(bool value) noexcept override;
214 : bool keep_alive(std::error_code& ec) const noexcept override;
215 :
216 : std::error_code set_receive_buffer_size(int size) noexcept override;
217 : int receive_buffer_size(std::error_code& ec) const noexcept override;
218 :
219 : std::error_code set_send_buffer_size(int size) noexcept override;
220 : int send_buffer_size(std::error_code& ec) const noexcept override;
221 :
222 : std::error_code set_linger(bool enabled, int timeout) noexcept override;
223 : tcp_socket::linger_options linger(std::error_code& ec) const noexcept override;
224 :
225 16 : endpoint local_endpoint() const noexcept override { return local_endpoint_; }
226 16 : endpoint remote_endpoint() const noexcept override { return remote_endpoint_; }
227 : bool is_open() const noexcept { return fd_ >= 0; }
228 : void cancel() noexcept override;
229 : void cancel_single_op(epoll_op& op) noexcept;
230 : void close_socket() noexcept;
231 : void update_epoll_events() noexcept;
232 2713 : void set_socket(int fd) noexcept { fd_ = fd; }
233 5426 : void set_endpoints(endpoint local, endpoint remote) noexcept
234 : {
235 5426 : local_endpoint_ = local;
236 5426 : remote_endpoint_ = remote;
237 5426 : }
238 :
239 : epoll_connect_op conn_;
240 : epoll_read_op rd_;
241 : epoll_write_op wr_;
242 :
243 : /// Per-descriptor state for persistent epoll registration
244 : descriptor_data desc_data_;
245 :
246 : void* read_initiator_frame_ = nullptr;
247 : void* write_initiator_frame_ = nullptr;
248 : read_initiator::handle_type read_initiator_handle_;
249 : write_initiator::handle_type write_initiator_handle_;
250 :
251 : /// Execute the read I/O operation (called by initiator coroutine).
252 : void do_read_io();
253 :
254 : /// Execute the write I/O operation (called by initiator coroutine).
255 : void do_write_io();
256 :
257 : private:
258 : epoll_socket_service& svc_;
259 : int fd_ = -1;
260 : endpoint local_endpoint_;
261 : endpoint remote_endpoint_;
262 : };
263 :
264 : /** State for epoll socket service. */
265 : class epoll_socket_state
266 : {
267 : public:
268 184 : explicit epoll_socket_state(epoll_scheduler& sched) noexcept
269 184 : : sched_(sched)
270 : {
271 184 : }
272 :
273 : epoll_scheduler& sched_;
274 : std::mutex mutex_;
275 : intrusive_list<epoll_socket_impl> socket_list_;
276 : std::unordered_map<epoll_socket_impl*, std::shared_ptr<epoll_socket_impl>> socket_ptrs_;
277 : };
278 :
279 : /** epoll socket service implementation.
280 :
281 : Inherits from socket_service to enable runtime polymorphism.
282 : Uses key_type = socket_service for service lookup.
283 : */
284 : class epoll_socket_service : public socket_service
285 : {
286 : public:
287 : explicit epoll_socket_service(capy::execution_context& ctx);
288 : ~epoll_socket_service();
289 :
290 : epoll_socket_service(epoll_socket_service const&) = delete;
291 : epoll_socket_service& operator=(epoll_socket_service const&) = delete;
292 :
293 : void shutdown() override;
294 :
295 : tcp_socket::socket_impl& create_impl() override;
296 : void destroy_impl(tcp_socket::socket_impl& impl) override;
297 : std::error_code open_socket(tcp_socket::socket_impl& impl) override;
298 :
299 10878 : epoll_scheduler& scheduler() const noexcept { return state_->sched_; }
300 : void post(epoll_op* op);
301 : void work_started() noexcept;
302 : void work_finished() noexcept;
303 :
304 : private:
305 : std::unique_ptr<epoll_socket_state> state_;
306 : };
307 :
308 : // Backward compatibility alias
309 : using epoll_sockets = epoll_socket_service;
310 :
311 : } // namespace boost::corosio::detail
312 :
313 : #endif // BOOST_COROSIO_HAS_EPOLL
314 :
315 : #endif // BOOST_COROSIO_DETAIL_EPOLL_SOCKETS_HPP
|