LCOV - code coverage report
Current view: top level - src/detail/epoll - sockets.hpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 91.2 % 34 31
Test Date: 2026-02-04 16:37:34 Functions: 85.7 % 21 18

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2026 Steve Gerbino
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/corosio
       8              : //
       9              : 
      10              : #ifndef BOOST_COROSIO_DETAIL_EPOLL_SOCKETS_HPP
      11              : #define BOOST_COROSIO_DETAIL_EPOLL_SOCKETS_HPP
      12              : 
      13              : #include <boost/corosio/detail/platform.hpp>
      14              : 
      15              : #if BOOST_COROSIO_HAS_EPOLL
      16              : 
      17              : #include <boost/corosio/detail/config.hpp>
      18              : #include <boost/corosio/tcp_socket.hpp>
      19              : #include <boost/capy/ex/executor_ref.hpp>
      20              : #include <boost/capy/ex/execution_context.hpp>
      21              : #include "src/detail/intrusive.hpp"
      22              : #include "src/detail/socket_service.hpp"
      23              : 
      24              : #include "src/detail/epoll/op.hpp"
      25              : #include "src/detail/epoll/scheduler.hpp"
      26              : 
      27              : #include <coroutine>
      28              : #include <memory>
      29              : #include <mutex>
      30              : #include <unordered_map>
      31              : 
      32              : /*
      33              :     epoll Socket Implementation
      34              :     ===========================
      35              : 
      36              :     Each I/O operation follows the same pattern:
      37              :       1. Try the syscall immediately (non-blocking socket)
      38              :       2. If it succeeds or fails with a real error, post to completion queue
      39              :       3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
      40              : 
      41              :     This "try first" approach avoids unnecessary epoll round-trips for
      42              :     operations that can complete immediately (common for small reads/writes
      43              :     on fast local connections).
      44              : 
      45              :     One-Shot Registration
      46              :     ---------------------
      47              :     We use one-shot epoll registration: each operation registers, waits for
      48              :     one event, then unregisters. This simplifies the state machine since we
      49              :     don't need to track whether an fd is currently registered or handle
      50              :     re-arming. The tradeoff is slightly more epoll_ctl calls, but the
      51              :     simplicity is worth it.
      52              : 
      53              :     Cancellation
      54              :     ------------
      55              :     See op.hpp for the completion/cancellation race handling via the
      56              :     `registered` atomic. cancel() must complete pending operations (post
      57              :     them with cancelled flag) so coroutines waiting on them can resume.
      58              :     close_socket() calls cancel() first to ensure this.
      59              : 
      60              :     Impl Lifetime with shared_ptr
      61              :     -----------------------------
      62              :     Socket impls use enable_shared_from_this. The service owns impls via
      63              :     shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
      64              :     removal. When a user calls close(), we call cancel() which posts pending
      65              :     ops to the scheduler.
      66              : 
      67              :     CRITICAL: The posted ops must keep the impl alive until they complete.
      68              :     Otherwise the scheduler would process a freed op (use-after-free). The
      69              :     cancel() method captures shared_from_this() into op.impl_ptr before
      70              :     posting. When the op completes, impl_ptr is cleared, allowing the impl
      71              :     to be destroyed if no other references exist.
      72              : 
      73              :     Service Ownership
      74              :     -----------------
      75              :     epoll_socket_service owns all socket impls. destroy_impl() removes the
      76              :     shared_ptr from the map, but the impl may survive if ops still hold
      77              :     impl_ptr refs. shutdown() closes all sockets and clears the map; any
      78              :     in-flight ops will complete and release their refs.
      79              : */
      80              : 
      81              : namespace boost::corosio::detail {
      82              : 
      83              : class epoll_socket_service;
      84              : class epoll_socket_impl;
      85              : 
      86              : /** Initiator coroutine for read operations.
      87              : 
      88              :     This coroutine receives control via symmetric transfer after the caller
      89              :     has fully suspended, then initiates the actual I/O. Uses cached frame
      90              :     allocation to avoid per-operation heap allocations.
      91              : */
      92              : struct read_initiator
      93              : {
      94              :     struct promise_type
      95              :     {
      96              :         epoll_socket_impl* impl;
      97              : 
      98              :         /// Reuse cached frame to avoid per-operation heap allocation.
      99        83723 :         static void* operator new(std::size_t n, void*& cached, epoll_socket_impl*)
     100              :         {
     101        83723 :             if (!cached)
     102           40 :                 cached = ::operator new(n);
     103        83723 :             return cached;
     104              :         }
     105              : 
     106              :         /// No-op - frame memory freed in socket destructor.
     107        83723 :         static void operator delete(void*) noexcept {}
     108              : 
     109        83723 :         std::suspend_always initial_suspend() noexcept { return {}; }
     110        83723 :         std::suspend_always final_suspend() noexcept { return {}; }
     111              : 
     112        83723 :         read_initiator get_return_object()
     113              :         {
     114        83723 :             return {std::coroutine_handle<promise_type>::from_promise(*this)};
     115              :         }
     116              : 
     117        83723 :         void return_void() {}
     118            0 :         void unhandled_exception() { std::terminate(); }
     119              :     };
     120              : 
     121              :     using handle_type = std::coroutine_handle<promise_type>;
     122              :     handle_type h;
     123              : };
     124              : 
     125              : /** Initiator coroutine for write operations.
     126              : 
     127              :     This coroutine receives control via symmetric transfer after the caller
     128              :     has fully suspended, then initiates the actual I/O. Uses cached frame
     129              :     allocation to avoid per-operation heap allocations.
     130              : */
     131              : struct write_initiator
     132              : {
     133              :     struct promise_type
     134              :     {
     135              :         epoll_socket_impl* impl;
     136              : 
     137              :         /// Reuse cached frame to avoid per-operation heap allocation.
     138        83601 :         static void* operator new(std::size_t n, void*& cached, epoll_socket_impl*)
     139              :         {
     140        83601 :             if (!cached)
     141           38 :                 cached = ::operator new(n);
     142        83601 :             return cached;
     143              :         }
     144              : 
     145              :         /// No-op - frame memory freed in socket destructor.
     146        83601 :         static void operator delete(void*) noexcept {}
     147              : 
     148        83601 :         std::suspend_always initial_suspend() noexcept { return {}; }
     149        83601 :         std::suspend_always final_suspend() noexcept { return {}; }
     150              : 
     151        83601 :         write_initiator get_return_object()
     152              :         {
     153        83601 :             return {std::coroutine_handle<promise_type>::from_promise(*this)};
     154              :         }
     155              : 
     156        83601 :         void return_void() {}
     157            0 :         void unhandled_exception() { std::terminate(); }
     158              :     };
     159              : 
     160              :     using handle_type = std::coroutine_handle<promise_type>;
     161              :     handle_type h;
     162              : };
     163              : 
     164              : // Coroutine factory functions (defined in sockets.cpp)
     165              : read_initiator make_read_initiator(void*& cached, epoll_socket_impl* impl);
     166              : write_initiator make_write_initiator(void*& cached, epoll_socket_impl* impl);
     167              : 
     168              : /// Socket implementation for epoll backend.
     169              : class epoll_socket_impl
     170              :     : public tcp_socket::socket_impl
     171              :     , public std::enable_shared_from_this<epoll_socket_impl>
     172              :     , public intrusive_list<epoll_socket_impl>::node
     173              : {
     174              :     friend class epoll_socket_service;
     175              : 
     176              : public:
     177              :     explicit epoll_socket_impl(epoll_socket_service& svc) noexcept;
     178              :     ~epoll_socket_impl();
     179              : 
     180              :     void release() override;
     181              : 
     182              :     void connect(
     183              :         std::coroutine_handle<>,
     184              :         capy::executor_ref,
     185              :         endpoint,
     186              :         std::stop_token,
     187              :         std::error_code*) override;
     188              : 
     189              :     std::coroutine_handle<> read_some(
     190              :         std::coroutine_handle<>,
     191              :         capy::executor_ref,
     192              :         io_buffer_param,
     193              :         std::stop_token,
     194              :         std::error_code*,
     195              :         std::size_t*) override;
     196              : 
     197              :     std::coroutine_handle<> write_some(
     198              :         std::coroutine_handle<>,
     199              :         capy::executor_ref,
     200              :         io_buffer_param,
     201              :         std::stop_token,
     202              :         std::error_code*,
     203              :         std::size_t*) override;
     204              : 
     205              :     std::error_code shutdown(tcp_socket::shutdown_type what) noexcept override;
     206              : 
     207            0 :     native_handle_type native_handle() const noexcept override { return fd_; }
     208              : 
     209              :     // Socket options
     210              :     std::error_code set_no_delay(bool value) noexcept override;
     211              :     bool no_delay(std::error_code& ec) const noexcept override;
     212              : 
     213              :     std::error_code set_keep_alive(bool value) noexcept override;
     214              :     bool keep_alive(std::error_code& ec) const noexcept override;
     215              : 
     216              :     std::error_code set_receive_buffer_size(int size) noexcept override;
     217              :     int receive_buffer_size(std::error_code& ec) const noexcept override;
     218              : 
     219              :     std::error_code set_send_buffer_size(int size) noexcept override;
     220              :     int send_buffer_size(std::error_code& ec) const noexcept override;
     221              : 
     222              :     std::error_code set_linger(bool enabled, int timeout) noexcept override;
     223              :     tcp_socket::linger_options linger(std::error_code& ec) const noexcept override;
     224              : 
     225           16 :     endpoint local_endpoint() const noexcept override { return local_endpoint_; }
     226           16 :     endpoint remote_endpoint() const noexcept override { return remote_endpoint_; }
     227              :     bool is_open() const noexcept { return fd_ >= 0; }
     228              :     void cancel() noexcept override;
     229              :     void cancel_single_op(epoll_op& op) noexcept;
     230              :     void close_socket() noexcept;
     231              :     void update_epoll_events() noexcept;
     232         2713 :     void set_socket(int fd) noexcept { fd_ = fd; }
     233         5426 :     void set_endpoints(endpoint local, endpoint remote) noexcept
     234              :     {
     235         5426 :         local_endpoint_ = local;
     236         5426 :         remote_endpoint_ = remote;
     237         5426 :     }
     238              : 
     239              :     epoll_connect_op conn_;
     240              :     epoll_read_op rd_;
     241              :     epoll_write_op wr_;
     242              : 
     243              :     /// Per-descriptor state for persistent epoll registration
     244              :     descriptor_data desc_data_;
     245              : 
     246              :     void* read_initiator_frame_ = nullptr;
     247              :     void* write_initiator_frame_ = nullptr;
     248              :     read_initiator::handle_type read_initiator_handle_;
     249              :     write_initiator::handle_type write_initiator_handle_;
     250              : 
     251              :     /// Execute the read I/O operation (called by initiator coroutine).
     252              :     void do_read_io();
     253              : 
     254              :     /// Execute the write I/O operation (called by initiator coroutine).
     255              :     void do_write_io();
     256              : 
     257              : private:
     258              :     epoll_socket_service& svc_;
     259              :     int fd_ = -1;
     260              :     endpoint local_endpoint_;
     261              :     endpoint remote_endpoint_;
     262              : };
     263              : 
     264              : /** State for epoll socket service. */
     265              : class epoll_socket_state
     266              : {
     267              : public:
     268          184 :     explicit epoll_socket_state(epoll_scheduler& sched) noexcept
     269          184 :         : sched_(sched)
     270              :     {
     271          184 :     }
     272              : 
     273              :     epoll_scheduler& sched_;
     274              :     std::mutex mutex_;
     275              :     intrusive_list<epoll_socket_impl> socket_list_;
     276              :     std::unordered_map<epoll_socket_impl*, std::shared_ptr<epoll_socket_impl>> socket_ptrs_;
     277              : };
     278              : 
     279              : /** epoll socket service implementation.
     280              : 
     281              :     Inherits from socket_service to enable runtime polymorphism.
     282              :     Uses key_type = socket_service for service lookup.
     283              : */
     284              : class epoll_socket_service : public socket_service
     285              : {
     286              : public:
     287              :     explicit epoll_socket_service(capy::execution_context& ctx);
     288              :     ~epoll_socket_service();
     289              : 
     290              :     epoll_socket_service(epoll_socket_service const&) = delete;
     291              :     epoll_socket_service& operator=(epoll_socket_service const&) = delete;
     292              : 
     293              :     void shutdown() override;
     294              : 
     295              :     tcp_socket::socket_impl& create_impl() override;
     296              :     void destroy_impl(tcp_socket::socket_impl& impl) override;
     297              :     std::error_code open_socket(tcp_socket::socket_impl& impl) override;
     298              : 
     299        10878 :     epoll_scheduler& scheduler() const noexcept { return state_->sched_; }
     300              :     void post(epoll_op* op);
     301              :     void work_started() noexcept;
     302              :     void work_finished() noexcept;
     303              : 
     304              : private:
     305              :     std::unique_ptr<epoll_socket_state> state_;
     306              : };
     307              : 
     308              : // Backward compatibility alias
     309              : using epoll_sockets = epoll_socket_service;
     310              : 
     311              : } // namespace boost::corosio::detail
     312              : 
     313              : #endif // BOOST_COROSIO_HAS_EPOLL
     314              : 
     315              : #endif // BOOST_COROSIO_DETAIL_EPOLL_SOCKETS_HPP
        

Generated by: LCOV version 2.3