LCOV - code coverage report
Current view: top level - src/detail/select - acceptors.cpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 64.9 % 248 161
Test Date: 2026-02-04 16:37:34 Functions: 89.5 % 19 17

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2026 Steve Gerbino
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/corosio
       8              : //
       9              : 
      10              : #include <boost/corosio/detail/platform.hpp>
      11              : 
      12              : #if BOOST_COROSIO_HAS_SELECT
      13              : 
      14              : #include "src/detail/select/acceptors.hpp"
      15              : #include "src/detail/select/sockets.hpp"
      16              : #include "src/detail/endpoint_convert.hpp"
      17              : #include "src/detail/make_err.hpp"
      18              : 
      19              : #include <errno.h>
      20              : #include <fcntl.h>
      21              : #include <netinet/in.h>
      22              : #include <sys/socket.h>
      23              : #include <unistd.h>
      24              : 
      25              : namespace boost::corosio::detail {
      26              : 
      27              : void
      28            0 : select_accept_op::
      29              : cancel() noexcept
      30              : {
      31            0 :     if (acceptor_impl_)
      32            0 :         acceptor_impl_->cancel_single_op(*this);
      33              :     else
      34            0 :         request_cancel();
      35            0 : }
      36              : 
      37              : void
      38         2181 : select_accept_op::
      39              : operator()()
      40              : {
      41         2181 :     stop_cb.reset();
      42              : 
      43         2181 :     bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
      44              : 
      45         2181 :     if (ec_out)
      46              :     {
      47         2181 :         if (cancelled.load(std::memory_order_acquire))
      48            3 :             *ec_out = capy::error::canceled;
      49         2178 :         else if (errn != 0)
      50            0 :             *ec_out = make_err(errn);
      51              :         else
      52         2178 :             *ec_out = {};
      53              :     }
      54              : 
      55         2181 :     if (success && accepted_fd >= 0)
      56              :     {
      57         2178 :         if (acceptor_impl_)
      58              :         {
      59         2178 :             auto* socket_svc = static_cast<select_acceptor_impl*>(acceptor_impl_)
      60         2178 :                 ->service().socket_service();
      61         2178 :             if (socket_svc)
      62              :             {
      63         2178 :                 auto& impl = static_cast<select_socket_impl&>(socket_svc->create_impl());
      64         2178 :                 impl.set_socket(accepted_fd);
      65              : 
      66         2178 :                 sockaddr_in local_addr{};
      67         2178 :                 socklen_t local_len = sizeof(local_addr);
      68         2178 :                 sockaddr_in remote_addr{};
      69         2178 :                 socklen_t remote_len = sizeof(remote_addr);
      70              : 
      71         2178 :                 endpoint local_ep, remote_ep;
      72         2178 :                 if (::getsockname(accepted_fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
      73         2178 :                     local_ep = from_sockaddr_in(local_addr);
      74         2178 :                 if (::getpeername(accepted_fd, reinterpret_cast<sockaddr*>(&remote_addr), &remote_len) == 0)
      75         2178 :                     remote_ep = from_sockaddr_in(remote_addr);
      76              : 
      77         2178 :                 impl.set_endpoints(local_ep, remote_ep);
      78              : 
      79         2178 :                 if (impl_out)
      80         2178 :                     *impl_out = &impl;
      81              : 
      82         2178 :                 accepted_fd = -1;
      83              :             }
      84              :             else
      85              :             {
      86            0 :                 if (ec_out && !*ec_out)
      87            0 :                     *ec_out = make_err(ENOENT);
      88            0 :                 ::close(accepted_fd);
      89            0 :                 accepted_fd = -1;
      90            0 :                 if (impl_out)
      91            0 :                     *impl_out = nullptr;
      92              :             }
      93              :         }
      94              :         else
      95              :         {
      96            0 :             ::close(accepted_fd);
      97            0 :             accepted_fd = -1;
      98            0 :             if (impl_out)
      99            0 :                 *impl_out = nullptr;
     100              :         }
     101         2178 :     }
     102              :     else
     103              :     {
     104            3 :         if (accepted_fd >= 0)
     105              :         {
     106            0 :             ::close(accepted_fd);
     107            0 :             accepted_fd = -1;
     108              :         }
     109              : 
     110            3 :         if (peer_impl)
     111              :         {
     112            0 :             peer_impl->release();
     113            0 :             peer_impl = nullptr;
     114              :         }
     115              : 
     116            3 :         if (impl_out)
     117            3 :             *impl_out = nullptr;
     118              :     }
     119              : 
     120              :     // Move to stack before destroying the frame
     121         2181 :     capy::executor_ref saved_ex( std::move( ex ) );
     122         2181 :     capy::coro saved_h( std::move( h ) );
     123         2181 :     impl_ptr.reset();
     124         2181 :     saved_ex.dispatch( saved_h );
     125         2181 : }
     126              : 
     127           49 : select_acceptor_impl::
     128           49 : select_acceptor_impl(select_acceptor_service& svc) noexcept
     129           49 :     : svc_(svc)
     130              : {
     131           49 : }
     132              : 
     133              : void
     134           49 : select_acceptor_impl::
     135              : release()
     136              : {
     137           49 :     close_socket();
     138           49 :     svc_.destroy_acceptor_impl(*this);
     139           49 : }
     140              : 
     141              : void
     142         2181 : select_acceptor_impl::
     143              : accept(
     144              :     std::coroutine_handle<> h,
     145              :     capy::executor_ref ex,
     146              :     std::stop_token token,
     147              :     std::error_code* ec,
     148              :     io_object::io_object_impl** impl_out)
     149              : {
     150         2181 :     auto& op = acc_;
     151         2181 :     op.reset();
     152         2181 :     op.h = h;
     153         2181 :     op.ex = ex;
     154         2181 :     op.ec_out = ec;
     155         2181 :     op.impl_out = impl_out;
     156         2181 :     op.fd = fd_;
     157         2181 :     op.start(token, this);
     158              : 
     159         2181 :     sockaddr_in addr{};
     160         2181 :     socklen_t addrlen = sizeof(addr);
     161         2181 :     int accepted = ::accept(fd_, reinterpret_cast<sockaddr*>(&addr), &addrlen);
     162              : 
     163         2181 :     if (accepted >= 0)
     164              :     {
     165              :         // Reject fds that exceed select()'s FD_SETSIZE limit.
     166              :         // Better to fail now than during later async operations.
     167            2 :         if (accepted >= FD_SETSIZE)
     168              :         {
     169            0 :             ::close(accepted);
     170            0 :             op.accepted_fd = -1;
     171            0 :             op.complete(EINVAL, 0);
     172            0 :             op.impl_ptr = shared_from_this();
     173            0 :             svc_.post(&op);
     174         2181 :             return;
     175              :         }
     176              : 
     177              :         // Set non-blocking and close-on-exec flags.
     178              :         // A non-blocking socket is essential for the async reactor;
     179              :         // if we can't configure it, fail rather than risk blocking.
     180            2 :         int flags = ::fcntl(accepted, F_GETFL, 0);
     181            2 :         if (flags == -1)
     182              :         {
     183            0 :             int err = errno;
     184            0 :             ::close(accepted);
     185            0 :             op.accepted_fd = -1;
     186            0 :             op.complete(err, 0);
     187            0 :             op.impl_ptr = shared_from_this();
     188            0 :             svc_.post(&op);
     189            0 :             return;
     190              :         }
     191              : 
     192            2 :         if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
     193              :         {
     194            0 :             int err = errno;
     195            0 :             ::close(accepted);
     196            0 :             op.accepted_fd = -1;
     197            0 :             op.complete(err, 0);
     198            0 :             op.impl_ptr = shared_from_this();
     199            0 :             svc_.post(&op);
     200            0 :             return;
     201              :         }
     202              : 
     203            2 :         if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
     204              :         {
     205            0 :             int err = errno;
     206            0 :             ::close(accepted);
     207            0 :             op.accepted_fd = -1;
     208            0 :             op.complete(err, 0);
     209            0 :             op.impl_ptr = shared_from_this();
     210            0 :             svc_.post(&op);
     211            0 :             return;
     212              :         }
     213              : 
     214            2 :         op.accepted_fd = accepted;
     215            2 :         op.complete(0, 0);
     216            2 :         op.impl_ptr = shared_from_this();
     217            2 :         svc_.post(&op);
     218            2 :         return;
     219              :     }
     220              : 
     221         2179 :     if (errno == EAGAIN || errno == EWOULDBLOCK)
     222              :     {
     223         2179 :         svc_.work_started();
     224         2179 :         op.impl_ptr = shared_from_this();
     225              : 
     226              :         // Set registering BEFORE register_fd to close the race window where
     227              :         // reactor sees an event before we set registered.
     228         2179 :         op.registered.store(select_registration_state::registering, std::memory_order_release);
     229         2179 :         svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
     230              : 
     231              :         // Transition to registered. If this fails, reactor or cancel already
     232              :         // claimed the op (state is now unregistered), so we're done. However,
     233              :         // we must still deregister the fd because cancel's deregister_fd may
     234              :         // have run before our register_fd, leaving the fd orphaned.
     235         2179 :         auto expected = select_registration_state::registering;
     236         2179 :         if (!op.registered.compare_exchange_strong(
     237              :                 expected, select_registration_state::registered, std::memory_order_acq_rel))
     238              :         {
     239            0 :             svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
     240            0 :             return;
     241              :         }
     242              : 
     243              :         // If cancelled was set before we registered, handle it now.
     244         2179 :         if (op.cancelled.load(std::memory_order_acquire))
     245              :         {
     246            0 :             auto prev = op.registered.exchange(
     247              :                 select_registration_state::unregistered, std::memory_order_acq_rel);
     248            0 :             if (prev != select_registration_state::unregistered)
     249              :             {
     250            0 :                 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
     251            0 :                 op.impl_ptr = shared_from_this();
     252            0 :                 svc_.post(&op);
     253            0 :                 svc_.work_finished();
     254              :             }
     255              :         }
     256         2179 :         return;
     257              :     }
     258              : 
     259            0 :     op.complete(errno, 0);
     260            0 :     op.impl_ptr = shared_from_this();
     261            0 :     svc_.post(&op);
     262              : }
     263              : 
     264              : void
     265           99 : select_acceptor_impl::
     266              : cancel() noexcept
     267              : {
     268           99 :     std::shared_ptr<select_acceptor_impl> self;
     269              :     try {
     270           99 :         self = shared_from_this();
     271            0 :     } catch (const std::bad_weak_ptr&) {
     272            0 :         return;
     273            0 :     }
     274              : 
     275           99 :     auto prev = acc_.registered.exchange(
     276              :         select_registration_state::unregistered, std::memory_order_acq_rel);
     277           99 :     acc_.request_cancel();
     278              : 
     279           99 :     if (prev != select_registration_state::unregistered)
     280              :     {
     281            3 :         svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
     282            3 :         acc_.impl_ptr = self;
     283            3 :         svc_.post(&acc_);
     284            3 :         svc_.work_finished();
     285              :     }
     286           99 : }
     287              : 
     288              : void
     289            0 : select_acceptor_impl::
     290              : cancel_single_op(select_op& op) noexcept
     291              : {
     292              :     // Called from stop_token callback to cancel a specific pending operation.
     293            0 :     auto prev = op.registered.exchange(
     294              :         select_registration_state::unregistered, std::memory_order_acq_rel);
     295            0 :     op.request_cancel();
     296              : 
     297            0 :     if (prev != select_registration_state::unregistered)
     298              :     {
     299            0 :         svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
     300              : 
     301              :         // Keep impl alive until op completes
     302              :         try {
     303            0 :             op.impl_ptr = shared_from_this();
     304            0 :         } catch (const std::bad_weak_ptr&) {
     305              :             // Impl is being destroyed, op will be orphaned but that's ok
     306            0 :         }
     307              : 
     308            0 :         svc_.post(&op);
     309            0 :         svc_.work_finished();
     310              :     }
     311            0 : }
     312              : 
     313              : void
     314           98 : select_acceptor_impl::
     315              : close_socket() noexcept
     316              : {
     317           98 :     cancel();
     318              : 
     319           98 :     if (fd_ >= 0)
     320              :     {
     321              :         // Unconditionally remove from registered_fds_ to handle edge cases
     322           42 :         svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
     323           42 :         ::close(fd_);
     324           42 :         fd_ = -1;
     325              :     }
     326              : 
     327              :     // Clear cached endpoint
     328           98 :     local_endpoint_ = endpoint{};
     329           98 : }
     330              : 
     331          120 : select_acceptor_service::
     332          120 : select_acceptor_service(capy::execution_context& ctx)
     333          120 :     : ctx_(ctx)
     334          120 :     , state_(std::make_unique<select_acceptor_state>(ctx.use_service<select_scheduler>()))
     335              : {
     336          120 : }
     337              : 
     338          240 : select_acceptor_service::
     339          120 : ~select_acceptor_service()
     340              : {
     341          240 : }
     342              : 
     343              : void
     344          120 : select_acceptor_service::
     345              : shutdown()
     346              : {
     347          120 :     std::lock_guard lock(state_->mutex_);
     348              : 
     349          120 :     while (auto* impl = state_->acceptor_list_.pop_front())
     350            0 :         impl->close_socket();
     351              : 
     352          120 :     state_->acceptor_ptrs_.clear();
     353          120 : }
     354              : 
     355              : tcp_acceptor::acceptor_impl&
     356           49 : select_acceptor_service::
     357              : create_acceptor_impl()
     358              : {
     359           49 :     auto impl = std::make_shared<select_acceptor_impl>(*this);
     360           49 :     auto* raw = impl.get();
     361              : 
     362           49 :     std::lock_guard lock(state_->mutex_);
     363           49 :     state_->acceptor_list_.push_back(raw);
     364           49 :     state_->acceptor_ptrs_.emplace(raw, std::move(impl));
     365              : 
     366           49 :     return *raw;
     367           49 : }
     368              : 
     369              : void
     370           49 : select_acceptor_service::
     371              : destroy_acceptor_impl(tcp_acceptor::acceptor_impl& impl)
     372              : {
     373           49 :     auto* select_impl = static_cast<select_acceptor_impl*>(&impl);
     374           49 :     std::lock_guard lock(state_->mutex_);
     375           49 :     state_->acceptor_list_.remove(select_impl);
     376           49 :     state_->acceptor_ptrs_.erase(select_impl);
     377           49 : }
     378              : 
     379              : std::error_code
     380           49 : select_acceptor_service::
     381              : open_acceptor(
     382              :     tcp_acceptor::acceptor_impl& impl,
     383              :     endpoint ep,
     384              :     int backlog)
     385              : {
     386           49 :     auto* select_impl = static_cast<select_acceptor_impl*>(&impl);
     387           49 :     select_impl->close_socket();
     388              : 
     389           49 :     int fd = ::socket(AF_INET, SOCK_STREAM, 0);
     390           49 :     if (fd < 0)
     391            0 :         return make_err(errno);
     392              : 
     393              :     // Set non-blocking and close-on-exec
     394           49 :     int flags = ::fcntl(fd, F_GETFL, 0);
     395           49 :     if (flags == -1)
     396              :     {
     397            0 :         int errn = errno;
     398            0 :         ::close(fd);
     399            0 :         return make_err(errn);
     400              :     }
     401           49 :     if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
     402              :     {
     403            0 :         int errn = errno;
     404            0 :         ::close(fd);
     405            0 :         return make_err(errn);
     406              :     }
     407           49 :     if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
     408              :     {
     409            0 :         int errn = errno;
     410            0 :         ::close(fd);
     411            0 :         return make_err(errn);
     412              :     }
     413              : 
     414              :     // Check fd is within select() limits
     415           49 :     if (fd >= FD_SETSIZE)
     416              :     {
     417            0 :         ::close(fd);
     418            0 :         return make_err(EMFILE);
     419              :     }
     420              : 
     421           49 :     int reuse = 1;
     422           49 :     ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
     423              : 
     424           49 :     sockaddr_in addr = detail::to_sockaddr_in(ep);
     425           49 :     if (::bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0)
     426              :     {
     427            7 :         int errn = errno;
     428            7 :         ::close(fd);
     429            7 :         return make_err(errn);
     430              :     }
     431              : 
     432           42 :     if (::listen(fd, backlog) < 0)
     433              :     {
     434            0 :         int errn = errno;
     435            0 :         ::close(fd);
     436            0 :         return make_err(errn);
     437              :     }
     438              : 
     439           42 :     select_impl->fd_ = fd;
     440              : 
     441              :     // Cache the local endpoint (queries OS for ephemeral port if port was 0)
     442           42 :     sockaddr_in local_addr{};
     443           42 :     socklen_t local_len = sizeof(local_addr);
     444           42 :     if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
     445           42 :         select_impl->set_local_endpoint(detail::from_sockaddr_in(local_addr));
     446              : 
     447           42 :     return {};
     448              : }
     449              : 
     450              : void
     451            5 : select_acceptor_service::
     452              : post(select_op* op)
     453              : {
     454            5 :     state_->sched_.post(op);
     455            5 : }
     456              : 
     457              : void
     458         2179 : select_acceptor_service::
     459              : work_started() noexcept
     460              : {
     461         2179 :     state_->sched_.work_started();
     462         2179 : }
     463              : 
     464              : void
     465            3 : select_acceptor_service::
     466              : work_finished() noexcept
     467              : {
     468            3 :     state_->sched_.work_finished();
     469            3 : }
     470              : 
     471              : select_socket_service*
     472         2178 : select_acceptor_service::
     473              : socket_service() const noexcept
     474              : {
     475         2178 :     auto* svc = ctx_.find_service<detail::socket_service>();
     476         2178 :     return svc ? dynamic_cast<select_socket_service*>(svc) : nullptr;
     477              : }
     478              : 
     479              : } // namespace boost::corosio::detail
     480              : 
     481              : #endif // BOOST_COROSIO_HAS_SELECT
        

Generated by: LCOV version 2.3