LCOV - code coverage report
Current view: top level - src/detail/select - op.hpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 74.8 % 131 98
Test Date: 2026-02-04 16:37:34 Functions: 84.2 % 19 16

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2026 Steve Gerbino
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/corosio
       8              : //
       9              : 
      10              : #ifndef BOOST_COROSIO_DETAIL_SELECT_OP_HPP
      11              : #define BOOST_COROSIO_DETAIL_SELECT_OP_HPP
      12              : 
      13              : #include <boost/corosio/detail/platform.hpp>
      14              : 
      15              : #if BOOST_COROSIO_HAS_SELECT
      16              : 
      17              : #include <boost/corosio/detail/config.hpp>
      18              : #include <boost/corosio/io_object.hpp>
      19              : #include <boost/corosio/endpoint.hpp>
      20              : #include <boost/capy/ex/executor_ref.hpp>
      21              : #include <boost/capy/coro.hpp>
      22              : #include <boost/capy/error.hpp>
      23              : #include <system_error>
      24              : 
      25              : #include "src/detail/make_err.hpp"
      26              : #include "src/detail/scheduler_op.hpp"
      27              : #include "src/detail/endpoint_convert.hpp"
      28              : 
      29              : #include <unistd.h>
      30              : #include <errno.h>
      31              : #include <fcntl.h>
      32              : 
      33              : #include <atomic>
      34              : #include <cstddef>
      35              : #include <memory>
      36              : #include <optional>
      37              : #include <stop_token>
      38              : 
      39              : #include <netinet/in.h>
      40              : #include <sys/select.h>
      41              : #include <sys/socket.h>
      42              : #include <sys/uio.h>
      43              : 
      44              : /*
      45              :     select Operation State
      46              :     ======================
      47              : 
      48              :     Each async I/O operation has a corresponding select_op-derived struct that
      49              :     holds the operation's state while it's in flight. The socket impl owns
      50              :     fixed slots for each operation type (conn_, rd_, wr_), so only one
      51              :     operation of each type can be pending per socket at a time.
      52              : 
      53              :     This mirrors the epoll_op design for consistency across backends.
      54              : 
      55              :     Completion vs Cancellation Race
      56              :     -------------------------------
      57              :     The `registered` atomic uses a tri-state (unregistered, registering,
      58              :     registered) to handle two races: (1) between register_fd() and the
      59              :     reactor seeing an event, and (2) between reactor completion and cancel().
      60              : 
      61              :     The registering state closes the window where an event could arrive
      62              :     after register_fd() but before the boolean was set. The reactor and
      63              :     cancel() both treat registering the same as registered when claiming.
      64              : 
      65              :     Whoever atomically exchanges to unregistered "claims" the operation
      66              :     and is responsible for completing it. The loser sees unregistered and
      67              :     does nothing. The initiating thread uses compare_exchange to transition
      68              :     from registering to registered; if this fails, the reactor or cancel
      69              :     already claimed the op.
      70              : 
      71              :     Impl Lifetime Management
      72              :     ------------------------
      73              :     When cancel() posts an op to the scheduler's ready queue, the socket impl
      74              :     might be destroyed before the scheduler processes the op. The `impl_ptr`
      75              :     member holds a shared_ptr to the impl, keeping it alive until the op
      76              :     completes.
      77              : 
      78              :     EOF Detection
      79              :     -------------
      80              :     For reads, 0 bytes with no error means EOF. But an empty user buffer also
      81              :     returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
      82              : 
      83              :     SIGPIPE Prevention
      84              :     ------------------
      85              :     Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
      86              :     SIGPIPE when the peer has closed.
      87              : */
      88              : 
      89              : namespace boost::corosio::detail {
      90              : 
      91              : // Forward declarations for cancellation support
      92              : class select_socket_impl;
      93              : class select_acceptor_impl;
      94              : 
      95              : /** Registration state for async operations.
      96              : 
      97              :     Tri-state enum to handle the race between register_fd() and
      98              :     run_reactor() seeing an event. Setting REGISTERING before
      99              :     calling register_fd() ensures events delivered during the
     100              :     registration window are not dropped.
     101              : */
     102              : enum class select_registration_state : std::uint8_t
     103              : {
     104              :     unregistered,  ///< Not registered with reactor
     105              :     registering,   ///< register_fd() called, not yet confirmed
     106              :     registered     ///< Fully registered, ready for events
     107              : };
     108              : 
     109              : struct select_op : scheduler_op
     110              : {
     111              :     struct canceller
     112              :     {
     113              :         select_op* op;
     114              :         void operator()() const noexcept;
     115              :     };
     116              : 
     117              :     capy::coro h;
     118              :     capy::executor_ref ex;
     119              :     std::error_code* ec_out = nullptr;
     120              :     std::size_t* bytes_out = nullptr;
     121              : 
     122              :     int fd = -1;
     123              :     int errn = 0;
     124              :     std::size_t bytes_transferred = 0;
     125              : 
     126              :     std::atomic<bool> cancelled{false};
     127              :     std::atomic<select_registration_state> registered{select_registration_state::unregistered};
     128              :     std::optional<std::stop_callback<canceller>> stop_cb;
     129              : 
     130              :     // Prevents use-after-free when socket is closed with pending ops.
     131              :     std::shared_ptr<void> impl_ptr;
     132              : 
     133              :     // For stop_token cancellation - pointer to owning socket/acceptor impl.
     134              :     select_socket_impl* socket_impl_ = nullptr;
     135              :     select_acceptor_impl* acceptor_impl_ = nullptr;
     136              : 
     137        13156 :     select_op()
     138        13156 :     {
     139        13156 :         data_ = this;
     140        13156 :     }
     141              : 
     142       167235 :     void reset() noexcept
     143              :     {
     144       167235 :         fd = -1;
     145       167235 :         errn = 0;
     146       167235 :         bytes_transferred = 0;
     147       167235 :         cancelled.store(false, std::memory_order_relaxed);
     148       167235 :         registered.store(select_registration_state::unregistered, std::memory_order_relaxed);
     149       167235 :         impl_ptr.reset();
     150       167235 :         socket_impl_ = nullptr;
     151       167235 :         acceptor_impl_ = nullptr;
     152       167235 :     }
     153              : 
     154       162874 :     void operator()() override
     155              :     {
     156       162874 :         stop_cb.reset();
     157              : 
     158       162874 :         if (ec_out)
     159              :         {
     160       162874 :             if (cancelled.load(std::memory_order_acquire))
     161          201 :                 *ec_out = capy::error::canceled;
     162       162673 :             else if (errn != 0)
     163            1 :                 *ec_out = make_err(errn);
     164       162672 :             else if (is_read_operation() && bytes_transferred == 0)
     165            5 :                 *ec_out = capy::error::eof;
     166              :             else
     167       162667 :                 *ec_out = {};
     168              :         }
     169              : 
     170       162874 :         if (bytes_out)
     171       162874 :             *bytes_out = bytes_transferred;
     172              : 
     173              :         // Move to stack before destroying the frame
     174       162874 :         capy::executor_ref saved_ex( std::move( ex ) );
     175       162874 :         capy::coro saved_h( std::move( h ) );
     176       162874 :         impl_ptr.reset();
     177       162874 :         saved_ex.dispatch( saved_h );
     178       162874 :     }
     179              : 
     180        81372 :     virtual bool is_read_operation() const noexcept { return false; }
     181              :     virtual void cancel() noexcept = 0;
     182              : 
     183            0 :     void destroy() override
     184              :     {
     185            0 :         stop_cb.reset();
     186            0 :         impl_ptr.reset();
     187            0 :     }
     188              : 
     189        20159 :     void request_cancel() noexcept
     190              :     {
     191        20159 :         cancelled.store(true, std::memory_order_release);
     192        20159 :     }
     193              : 
     194              :     void start(std::stop_token token)
     195              :     {
     196              :         cancelled.store(false, std::memory_order_release);
     197              :         stop_cb.reset();
     198              :         socket_impl_ = nullptr;
     199              :         acceptor_impl_ = nullptr;
     200              : 
     201              :         if (token.stop_possible())
     202              :             stop_cb.emplace(token, canceller{this});
     203              :     }
     204              : 
     205       165054 :     void start(std::stop_token token, select_socket_impl* impl)
     206              :     {
     207       165054 :         cancelled.store(false, std::memory_order_release);
     208       165054 :         stop_cb.reset();
     209       165054 :         socket_impl_ = impl;
     210       165054 :         acceptor_impl_ = nullptr;
     211              : 
     212       165054 :         if (token.stop_possible())
     213           99 :             stop_cb.emplace(token, canceller{this});
     214       165054 :     }
     215              : 
     216         2181 :     void start(std::stop_token token, select_acceptor_impl* impl)
     217              :     {
     218         2181 :         cancelled.store(false, std::memory_order_release);
     219         2181 :         stop_cb.reset();
     220         2181 :         socket_impl_ = nullptr;
     221         2181 :         acceptor_impl_ = impl;
     222              : 
     223         2181 :         if (token.stop_possible())
     224            0 :             stop_cb.emplace(token, canceller{this});
     225         2181 :     }
     226              : 
     227       167116 :     void complete(int err, std::size_t bytes) noexcept
     228              :     {
     229       167116 :         errn = err;
     230       167116 :         bytes_transferred = bytes;
     231       167116 :     }
     232              : 
     233            0 :     virtual void perform_io() noexcept {}
     234              : };
     235              : 
     236              : 
     237              : struct select_connect_op : select_op
     238              : {
     239              :     endpoint target_endpoint;
     240              : 
     241         2180 :     void reset() noexcept
     242              :     {
     243         2180 :         select_op::reset();
     244         2180 :         target_endpoint = endpoint{};
     245         2180 :     }
     246              : 
     247         2180 :     void perform_io() noexcept override
     248              :     {
     249              :         // connect() completion status is retrieved via SO_ERROR, not return value
     250         2180 :         int err = 0;
     251         2180 :         socklen_t len = sizeof(err);
     252         2180 :         if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
     253            0 :             err = errno;
     254         2180 :         complete(err, 0);
     255         2180 :     }
     256              : 
     257              :     // Defined in sockets.cpp where select_socket_impl is complete
     258              :     void operator()() override;
     259              :     void cancel() noexcept override;
     260              : };
     261              : 
     262              : 
     263              : struct select_read_op : select_op
     264              : {
     265              :     static constexpr std::size_t max_buffers = 16;
     266              :     iovec iovecs[max_buffers];
     267              :     int iovec_count = 0;
     268              :     bool empty_buffer_read = false;
     269              : 
     270        81300 :     bool is_read_operation() const noexcept override
     271              :     {
     272        81300 :         return !empty_buffer_read;
     273              :     }
     274              : 
     275        81497 :     void reset() noexcept
     276              :     {
     277        81497 :         select_op::reset();
     278        81497 :         iovec_count = 0;
     279        81497 :         empty_buffer_read = false;
     280        81497 :     }
     281              : 
     282           48 :     void perform_io() noexcept override
     283              :     {
     284           48 :         ssize_t n = ::readv(fd, iovecs, iovec_count);
     285           48 :         if (n >= 0)
     286           48 :             complete(0, static_cast<std::size_t>(n));
     287              :         else
     288            0 :             complete(errno, 0);
     289           48 :     }
     290              : 
     291              :     void cancel() noexcept override;
     292              : };
     293              : 
     294              : 
     295              : struct select_write_op : select_op
     296              : {
     297              :     static constexpr std::size_t max_buffers = 16;
     298              :     iovec iovecs[max_buffers];
     299              :     int iovec_count = 0;
     300              : 
     301        81377 :     void reset() noexcept
     302              :     {
     303        81377 :         select_op::reset();
     304        81377 :         iovec_count = 0;
     305        81377 :     }
     306              : 
     307            0 :     void perform_io() noexcept override
     308              :     {
     309            0 :         msghdr msg{};
     310            0 :         msg.msg_iov = iovecs;
     311            0 :         msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
     312              : 
     313            0 :         ssize_t n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
     314            0 :         if (n >= 0)
     315            0 :             complete(0, static_cast<std::size_t>(n));
     316              :         else
     317            0 :             complete(errno, 0);
     318            0 :     }
     319              : 
     320              :     void cancel() noexcept override;
     321              : };
     322              : 
     323              : 
     324              : struct select_accept_op : select_op
     325              : {
     326              :     int accepted_fd = -1;
     327              :     io_object::io_object_impl* peer_impl = nullptr;
     328              :     io_object::io_object_impl** impl_out = nullptr;
     329              : 
     330         2181 :     void reset() noexcept
     331              :     {
     332         2181 :         select_op::reset();
     333         2181 :         accepted_fd = -1;
     334         2181 :         peer_impl = nullptr;
     335         2181 :         impl_out = nullptr;
     336         2181 :     }
     337              : 
     338         2176 :     void perform_io() noexcept override
     339              :     {
     340         2176 :         sockaddr_in addr{};
     341         2176 :         socklen_t addrlen = sizeof(addr);
     342              : 
     343              :         // Note: select backend uses accept() + fcntl instead of accept4()
     344              :         // for broader POSIX compatibility
     345         2176 :         int new_fd = ::accept(fd, reinterpret_cast<sockaddr*>(&addr), &addrlen);
     346              : 
     347         2176 :         if (new_fd >= 0)
     348              :         {
     349              :             // Reject fds that exceed select()'s FD_SETSIZE limit.
     350              :             // Better to fail now than during later async operations.
     351         2176 :             if (new_fd >= FD_SETSIZE)
     352              :             {
     353            0 :                 ::close(new_fd);
     354            0 :                 complete(EINVAL, 0);
     355            0 :                 return;
     356              :             }
     357              : 
     358              :             // Set non-blocking and close-on-exec flags.
     359              :             // A non-blocking socket is essential for the async reactor;
     360              :             // if we can't configure it, fail rather than risk blocking.
     361         2176 :             int flags = ::fcntl(new_fd, F_GETFL, 0);
     362         2176 :             if (flags == -1)
     363              :             {
     364            0 :                 int err = errno;
     365            0 :                 ::close(new_fd);
     366            0 :                 complete(err, 0);
     367            0 :                 return;
     368              :             }
     369              : 
     370         2176 :             if (::fcntl(new_fd, F_SETFL, flags | O_NONBLOCK) == -1)
     371              :             {
     372            0 :                 int err = errno;
     373            0 :                 ::close(new_fd);
     374            0 :                 complete(err, 0);
     375            0 :                 return;
     376              :             }
     377              : 
     378         2176 :             if (::fcntl(new_fd, F_SETFD, FD_CLOEXEC) == -1)
     379              :             {
     380            0 :                 int err = errno;
     381            0 :                 ::close(new_fd);
     382            0 :                 complete(err, 0);
     383            0 :                 return;
     384              :             }
     385              : 
     386         2176 :             accepted_fd = new_fd;
     387         2176 :             complete(0, 0);
     388              :         }
     389              :         else
     390              :         {
     391            0 :             complete(errno, 0);
     392              :         }
     393              :     }
     394              : 
     395              :     // Defined in acceptors.cpp where select_acceptor_impl is complete
     396              :     void operator()() override;
     397              :     void cancel() noexcept override;
     398              : };
     399              : 
     400              : } // namespace boost::corosio::detail
     401              : 
     402              : #endif // BOOST_COROSIO_HAS_SELECT
     403              : 
     404              : #endif // BOOST_COROSIO_DETAIL_SELECT_OP_HPP
        

Generated by: LCOV version 2.3