Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #include <boost/corosio/detail/platform.hpp>
11 :
12 : #if BOOST_COROSIO_HAS_EPOLL
13 :
14 : #include "src/detail/epoll/acceptors.hpp"
15 : #include "src/detail/epoll/sockets.hpp"
16 : #include "src/detail/endpoint_convert.hpp"
17 : #include "src/detail/make_err.hpp"
18 :
19 : #include <errno.h>
20 : #include <netinet/in.h>
21 : #include <sys/epoll.h>
22 : #include <sys/socket.h>
23 : #include <unistd.h>
24 :
25 : namespace boost::corosio::detail {
26 :
27 : void
28 6 : epoll_accept_op::
29 : cancel() noexcept
30 : {
31 6 : if (acceptor_impl_)
32 6 : acceptor_impl_->cancel_single_op(*this);
33 : else
34 0 : request_cancel();
35 6 : }
36 :
37 : void
38 2722 : epoll_accept_op::
39 : operator()()
40 : {
41 2722 : stop_cb.reset();
42 :
43 2722 : bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
44 :
45 2722 : if (ec_out)
46 : {
47 2722 : if (cancelled.load(std::memory_order_acquire))
48 9 : *ec_out = capy::error::canceled;
49 2713 : else if (errn != 0)
50 0 : *ec_out = make_err(errn);
51 : else
52 2713 : *ec_out = {};
53 : }
54 :
55 2722 : if (success && accepted_fd >= 0)
56 : {
57 2713 : if (acceptor_impl_)
58 : {
59 2713 : auto* socket_svc = static_cast<epoll_acceptor_impl*>(acceptor_impl_)
60 2713 : ->service().socket_service();
61 2713 : if (socket_svc)
62 : {
63 2713 : auto& impl = static_cast<epoll_socket_impl&>(socket_svc->create_impl());
64 2713 : impl.set_socket(accepted_fd);
65 :
66 : // Register accepted socket with epoll (edge-triggered mode)
67 2713 : impl.desc_data_.fd = accepted_fd;
68 2713 : impl.desc_data_.read_op.store(nullptr, std::memory_order_relaxed);
69 2713 : impl.desc_data_.write_op.store(nullptr, std::memory_order_relaxed);
70 2713 : impl.desc_data_.connect_op.store(nullptr, std::memory_order_relaxed);
71 2713 : socket_svc->scheduler().register_descriptor(accepted_fd, &impl.desc_data_);
72 :
73 2713 : sockaddr_in local_addr{};
74 2713 : socklen_t local_len = sizeof(local_addr);
75 2713 : sockaddr_in remote_addr{};
76 2713 : socklen_t remote_len = sizeof(remote_addr);
77 :
78 2713 : endpoint local_ep, remote_ep;
79 2713 : if (::getsockname(accepted_fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
80 2713 : local_ep = from_sockaddr_in(local_addr);
81 2713 : if (::getpeername(accepted_fd, reinterpret_cast<sockaddr*>(&remote_addr), &remote_len) == 0)
82 2713 : remote_ep = from_sockaddr_in(remote_addr);
83 :
84 2713 : impl.set_endpoints(local_ep, remote_ep);
85 :
86 2713 : if (impl_out)
87 2713 : *impl_out = &impl;
88 :
89 2713 : accepted_fd = -1;
90 : }
91 : else
92 : {
93 0 : if (ec_out && !*ec_out)
94 0 : *ec_out = make_err(ENOENT);
95 0 : ::close(accepted_fd);
96 0 : accepted_fd = -1;
97 0 : if (impl_out)
98 0 : *impl_out = nullptr;
99 : }
100 : }
101 : else
102 : {
103 0 : ::close(accepted_fd);
104 0 : accepted_fd = -1;
105 0 : if (impl_out)
106 0 : *impl_out = nullptr;
107 : }
108 2713 : }
109 : else
110 : {
111 9 : if (accepted_fd >= 0)
112 : {
113 0 : ::close(accepted_fd);
114 0 : accepted_fd = -1;
115 : }
116 :
117 9 : if (peer_impl)
118 : {
119 0 : peer_impl->release();
120 0 : peer_impl = nullptr;
121 : }
122 :
123 9 : if (impl_out)
124 9 : *impl_out = nullptr;
125 : }
126 :
127 : // Move to stack before resuming. See epoll_op::operator()() for rationale.
128 2722 : capy::executor_ref saved_ex( std::move( ex ) );
129 2722 : capy::coro saved_h( std::move( h ) );
130 2722 : auto prevent_premature_destruction = std::move(impl_ptr);
131 2722 : saved_ex.dispatch( saved_h );
132 2722 : }
133 :
134 66 : epoll_acceptor_impl::
135 66 : epoll_acceptor_impl(epoll_acceptor_service& svc) noexcept
136 66 : : svc_(svc)
137 : {
138 66 : }
139 :
140 : void
141 0 : epoll_acceptor_impl::
142 : update_epoll_events() noexcept
143 : {
144 0 : svc_.scheduler().update_descriptor_events(fd_, &desc_data_, 0);
145 0 : }
146 :
147 : void
148 66 : epoll_acceptor_impl::
149 : release()
150 : {
151 66 : close_socket();
152 66 : svc_.destroy_acceptor_impl(*this);
153 66 : }
154 :
155 : void
156 2722 : epoll_acceptor_impl::
157 : accept(
158 : std::coroutine_handle<> h,
159 : capy::executor_ref ex,
160 : std::stop_token token,
161 : std::error_code* ec,
162 : io_object::io_object_impl** impl_out)
163 : {
164 2722 : auto& op = acc_;
165 2722 : op.reset();
166 2722 : op.h = h;
167 2722 : op.ex = ex;
168 2722 : op.ec_out = ec;
169 2722 : op.impl_out = impl_out;
170 2722 : op.fd = fd_;
171 2722 : op.start(token, this);
172 :
173 2722 : sockaddr_in addr{};
174 2722 : socklen_t addrlen = sizeof(addr);
175 2722 : int accepted = ::accept4(fd_, reinterpret_cast<sockaddr*>(&addr),
176 : &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
177 :
178 2722 : if (accepted >= 0)
179 : {
180 2 : desc_data_.read_ready.store(false, std::memory_order_relaxed);
181 2 : op.accepted_fd = accepted;
182 2 : op.complete(0, 0);
183 2 : op.impl_ptr = shared_from_this();
184 2 : svc_.post(&op);
185 2722 : return;
186 : }
187 :
188 2720 : if (errno == EAGAIN || errno == EWOULDBLOCK)
189 : {
190 2720 : svc_.work_started();
191 2720 : op.impl_ptr = shared_from_this();
192 :
193 2720 : desc_data_.read_op.store(&op, std::memory_order_release);
194 : std::atomic_thread_fence(std::memory_order_seq_cst);
195 :
196 2720 : if (desc_data_.read_ready.exchange(false, std::memory_order_acquire))
197 : {
198 0 : auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel);
199 0 : if (claimed)
200 : {
201 0 : claimed->perform_io();
202 0 : if (claimed->errn == EAGAIN || claimed->errn == EWOULDBLOCK)
203 : {
204 0 : claimed->errn = 0;
205 0 : desc_data_.read_op.store(claimed, std::memory_order_release);
206 : }
207 : else
208 : {
209 0 : svc_.post(claimed);
210 0 : svc_.work_finished();
211 : }
212 0 : return;
213 : }
214 : }
215 :
216 2720 : if (op.cancelled.load(std::memory_order_acquire))
217 : {
218 0 : auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel);
219 0 : if (claimed)
220 : {
221 0 : svc_.post(claimed);
222 0 : svc_.work_finished();
223 : }
224 : }
225 2720 : return;
226 : }
227 :
228 0 : op.complete(errno, 0);
229 0 : op.impl_ptr = shared_from_this();
230 0 : svc_.post(&op);
231 : }
232 :
233 : void
234 133 : epoll_acceptor_impl::
235 : cancel() noexcept
236 : {
237 133 : std::shared_ptr<epoll_acceptor_impl> self;
238 : try {
239 133 : self = shared_from_this();
240 0 : } catch (const std::bad_weak_ptr&) {
241 0 : return;
242 0 : }
243 :
244 133 : acc_.request_cancel();
245 : // Use atomic exchange - only one of cancellation or reactor will succeed
246 133 : auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel);
247 133 : if (claimed == &acc_)
248 : {
249 3 : acc_.impl_ptr = self;
250 3 : svc_.post(&acc_);
251 3 : svc_.work_finished();
252 : }
253 133 : }
254 :
255 : void
256 6 : epoll_acceptor_impl::
257 : cancel_single_op(epoll_op& op) noexcept
258 : {
259 6 : op.request_cancel();
260 :
261 : // Use atomic exchange - only one of cancellation or reactor will succeed
262 6 : auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel);
263 6 : if (claimed == &op)
264 : {
265 : try {
266 6 : op.impl_ptr = shared_from_this();
267 0 : } catch (const std::bad_weak_ptr&) {}
268 6 : svc_.post(&op);
269 6 : svc_.work_finished();
270 : }
271 6 : }
272 :
273 : void
274 132 : epoll_acceptor_impl::
275 : close_socket() noexcept
276 : {
277 132 : cancel();
278 :
279 132 : if (fd_ >= 0)
280 : {
281 57 : if (desc_data_.registered_events != 0)
282 57 : svc_.scheduler().deregister_descriptor(fd_);
283 57 : ::close(fd_);
284 57 : fd_ = -1;
285 : }
286 :
287 132 : desc_data_.fd = -1;
288 132 : desc_data_.is_registered = false;
289 132 : desc_data_.read_op.store(nullptr, std::memory_order_relaxed);
290 132 : desc_data_.read_ready.store(false, std::memory_order_relaxed);
291 132 : desc_data_.write_ready.store(false, std::memory_order_relaxed);
292 132 : desc_data_.registered_events = 0;
293 :
294 : // Clear cached endpoint
295 132 : local_endpoint_ = endpoint{};
296 132 : }
297 :
298 184 : epoll_acceptor_service::
299 184 : epoll_acceptor_service(capy::execution_context& ctx)
300 184 : : ctx_(ctx)
301 184 : , state_(std::make_unique<epoll_acceptor_state>(ctx.use_service<epoll_scheduler>()))
302 : {
303 184 : }
304 :
305 368 : epoll_acceptor_service::
306 184 : ~epoll_acceptor_service()
307 : {
308 368 : }
309 :
310 : void
311 184 : epoll_acceptor_service::
312 : shutdown()
313 : {
314 184 : std::lock_guard lock(state_->mutex_);
315 :
316 184 : while (auto* impl = state_->acceptor_list_.pop_front())
317 0 : impl->close_socket();
318 :
319 184 : state_->acceptor_ptrs_.clear();
320 184 : }
321 :
322 : tcp_acceptor::acceptor_impl&
323 66 : epoll_acceptor_service::
324 : create_acceptor_impl()
325 : {
326 66 : auto impl = std::make_shared<epoll_acceptor_impl>(*this);
327 66 : auto* raw = impl.get();
328 :
329 66 : std::lock_guard lock(state_->mutex_);
330 66 : state_->acceptor_list_.push_back(raw);
331 66 : state_->acceptor_ptrs_.emplace(raw, std::move(impl));
332 :
333 66 : return *raw;
334 66 : }
335 :
336 : void
337 66 : epoll_acceptor_service::
338 : destroy_acceptor_impl(tcp_acceptor::acceptor_impl& impl)
339 : {
340 66 : auto* epoll_impl = static_cast<epoll_acceptor_impl*>(&impl);
341 66 : std::lock_guard lock(state_->mutex_);
342 66 : state_->acceptor_list_.remove(epoll_impl);
343 66 : state_->acceptor_ptrs_.erase(epoll_impl);
344 66 : }
345 :
346 : std::error_code
347 66 : epoll_acceptor_service::
348 : open_acceptor(
349 : tcp_acceptor::acceptor_impl& impl,
350 : endpoint ep,
351 : int backlog)
352 : {
353 66 : auto* epoll_impl = static_cast<epoll_acceptor_impl*>(&impl);
354 66 : epoll_impl->close_socket();
355 :
356 66 : int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
357 66 : if (fd < 0)
358 0 : return make_err(errno);
359 :
360 66 : int reuse = 1;
361 66 : ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
362 :
363 66 : sockaddr_in addr = detail::to_sockaddr_in(ep);
364 66 : if (::bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0)
365 : {
366 9 : int errn = errno;
367 9 : ::close(fd);
368 9 : return make_err(errn);
369 : }
370 :
371 57 : if (::listen(fd, backlog) < 0)
372 : {
373 0 : int errn = errno;
374 0 : ::close(fd);
375 0 : return make_err(errn);
376 : }
377 :
378 57 : epoll_impl->fd_ = fd;
379 :
380 : // Register fd with epoll (edge-triggered mode)
381 57 : epoll_impl->desc_data_.fd = fd;
382 57 : epoll_impl->desc_data_.read_op.store(nullptr, std::memory_order_relaxed);
383 57 : scheduler().register_descriptor(fd, &epoll_impl->desc_data_);
384 :
385 : // Cache the local endpoint (queries OS for ephemeral port if port was 0)
386 57 : sockaddr_in local_addr{};
387 57 : socklen_t local_len = sizeof(local_addr);
388 57 : if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
389 57 : epoll_impl->set_local_endpoint(detail::from_sockaddr_in(local_addr));
390 :
391 57 : return {};
392 : }
393 :
394 : void
395 11 : epoll_acceptor_service::
396 : post(epoll_op* op)
397 : {
398 11 : state_->sched_.post(op);
399 11 : }
400 :
401 : void
402 2720 : epoll_acceptor_service::
403 : work_started() noexcept
404 : {
405 2720 : state_->sched_.work_started();
406 2720 : }
407 :
408 : void
409 9 : epoll_acceptor_service::
410 : work_finished() noexcept
411 : {
412 9 : state_->sched_.work_finished();
413 9 : }
414 :
415 : epoll_socket_service*
416 2713 : epoll_acceptor_service::
417 : socket_service() const noexcept
418 : {
419 2713 : auto* svc = ctx_.find_service<detail::socket_service>();
420 2713 : return svc ? dynamic_cast<epoll_socket_service*>(svc) : nullptr;
421 : }
422 :
423 : } // namespace boost::corosio::detail
424 :
425 : #endif // BOOST_COROSIO_HAS_EPOLL
|