LCOV - code coverage report
Current view: top level - src/detail/epoll - scheduler.hpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 0.0 % 2 0
Test Date: 2026-02-04 16:37:34 Functions: 0.0 % 2 0

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2026 Steve Gerbino
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/corosio
       8              : //
       9              : 
      10              : #ifndef BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
      11              : #define BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
      12              : 
      13              : #include <boost/corosio/detail/platform.hpp>
      14              : 
      15              : #if BOOST_COROSIO_HAS_EPOLL
      16              : 
      17              : #include <boost/corosio/detail/config.hpp>
      18              : #include <boost/corosio/detail/scheduler.hpp>
      19              : #include <boost/capy/ex/execution_context.hpp>
      20              : 
      21              : #include "src/detail/scheduler_op.hpp"
      22              : #include "src/detail/timer_service.hpp"
      23              : 
      24              : #include <atomic>
      25              : #include <condition_variable>
      26              : #include <cstddef>
      27              : #include <cstdint>
      28              : #include <mutex>
      29              : 
      30              : namespace boost::corosio::detail {
      31              : 
      32              : struct epoll_op;
      33              : struct descriptor_data;
      34              : 
      35              : /** Linux scheduler using epoll for I/O multiplexing.
      36              : 
      37              :     This scheduler implements the scheduler interface using Linux epoll
      38              :     for efficient I/O event notification. It uses a single reactor model
      39              :     where one thread runs epoll_wait while other threads
      40              :     wait on a condition variable for handler work. This design provides:
      41              : 
      42              :     - Handler parallelism: N posted handlers can execute on N threads
      43              :     - No thundering herd: condition_variable wakes exactly one thread
      44              :     - IOCP parity: Behavior matches Windows I/O completion port semantics
      45              : 
      46              :     When threads call run(), they first try to execute queued handlers.
      47              :     If the queue is empty and no reactor is running, one thread becomes
      48              :     the reactor and runs epoll_wait. Other threads wait on a condition
      49              :     variable until handlers are available.
      50              : 
      51              :     @par Thread Safety
      52              :     All public member functions are thread-safe.
      53              : */
      54              : class epoll_scheduler
      55              :     : public scheduler
      56              :     , public capy::execution_context::service
      57              : {
      58              : public:
      59              :     using key_type = scheduler;
      60              : 
      61              :     /** Construct the scheduler.
      62              : 
      63              :         Creates an epoll instance, eventfd for reactor interruption,
      64              :         and timerfd for kernel-managed timer expiry.
      65              : 
      66              :         @param ctx Reference to the owning execution_context.
      67              :         @param concurrency_hint Hint for expected thread count (unused).
      68              :     */
      69              :     epoll_scheduler(
      70              :         capy::execution_context& ctx,
      71              :         int concurrency_hint = -1);
      72              : 
      73              :     /// Destroy the scheduler.
      74              :     ~epoll_scheduler();
      75              : 
      76              :     epoll_scheduler(epoll_scheduler const&) = delete;
      77              :     epoll_scheduler& operator=(epoll_scheduler const&) = delete;
      78              : 
      79              :     void shutdown() override;
      80              :     void post(capy::coro h) const override;
      81              :     void post(scheduler_op* h) const override;
      82              :     void on_work_started() noexcept override;
      83              :     void on_work_finished() noexcept override;
      84              :     bool running_in_this_thread() const noexcept override;
      85              :     void stop() override;
      86              :     bool stopped() const noexcept override;
      87              :     void restart() override;
      88              :     std::size_t run() override;
      89              :     std::size_t run_one() override;
      90              :     std::size_t wait_one(long usec) override;
      91              :     std::size_t poll() override;
      92              :     std::size_t poll_one() override;
      93              : 
      94              :     /** Return the epoll file descriptor.
      95              : 
      96              :         Used by socket services to register file descriptors
      97              :         for I/O event notification.
      98              : 
      99              :         @return The epoll file descriptor.
     100              :     */
     101              :     int epoll_fd() const noexcept { return epoll_fd_; }
     102              : 
     103              :     /** Register a descriptor for persistent monitoring.
     104              : 
     105              :         The fd is registered once and stays registered until explicitly
     106              :         deregistered. Events are dispatched via descriptor_data which
     107              :         tracks pending read/write/connect operations.
     108              : 
     109              :         @param fd The file descriptor to register.
     110              :         @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
     111              :     */
     112              :     void register_descriptor(int fd, descriptor_data* desc) const;
     113              : 
     114              :     /** Update events for a persistently registered descriptor.
     115              : 
     116              :         @param fd The file descriptor.
     117              :         @param desc Pointer to descriptor data.
     118              :         @param events The new events to monitor.
     119              :     */
     120              :     void update_descriptor_events(int fd, descriptor_data* desc, std::uint32_t events) const;
     121              : 
     122              :     /** Deregister a persistently registered descriptor.
     123              : 
     124              :         @param fd The file descriptor to deregister.
     125              :     */
     126              :     void deregister_descriptor(int fd) const;
     127              : 
     128              :     /** For use by I/O operations to track pending work. */
     129              :     void work_started() const noexcept override;
     130              : 
     131              :     /** For use by I/O operations to track completed work. */
     132              :     void work_finished() const noexcept override;
     133              : 
     134              :     /** Drain work from thread context's private queue to global queue.
     135              : 
     136              :         Called by thread_context_guard destructor when a thread exits run().
     137              :         Transfers pending work to the global queue under mutex protection.
     138              : 
     139              :         @param queue The private queue to drain.
     140              :         @param count Item count for wakeup decisions (wakes other threads if positive).
     141              :     */
     142              :     void drain_thread_queue(op_queue& queue, long count) const;
     143              : 
     144              : private:
     145              :     std::size_t do_one(long timeout_us);
     146              :     void run_reactor(std::unique_lock<std::mutex>& lock);
     147              :     void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
     148              :     void interrupt_reactor() const;
     149              :     void update_timerfd() const;
     150              : 
     151              :     int epoll_fd_;
     152              :     int event_fd_;                              // for interrupting reactor
     153              :     int timer_fd_;                              // timerfd for kernel-managed timer expiry
     154              :     mutable std::mutex mutex_;
     155              :     mutable std::condition_variable wakeup_event_;
     156              :     mutable op_queue completed_ops_;
     157              :     mutable std::atomic<long> outstanding_work_;
     158              :     std::atomic<bool> stopped_;
     159              :     bool shutdown_;
     160              :     timer_service* timer_svc_ = nullptr;
     161              : 
     162              :     // Single reactor thread coordination
     163              :     mutable bool reactor_running_ = false;
     164              :     mutable bool reactor_interrupted_ = false;
     165              :     mutable int idle_thread_count_ = 0;
     166              : 
     167              :     // Edge-triggered eventfd state
     168              :     mutable std::atomic<bool> eventfd_armed_{false};
     169              : 
     170              : 
     171              :     // Sentinel operation for interleaving reactor runs with handler execution.
     172              :     // Ensures the reactor runs periodically even when handlers are continuously
     173              :     // posted, preventing timer starvation.
     174              :     struct task_op final : scheduler_op
     175              :     {
     176            0 :         void operator()() override {}
     177            0 :         void destroy() override {}
     178              :     };
     179              :     task_op task_op_;
     180              : };
     181              : 
     182              : } // namespace boost::corosio::detail
     183              : 
     184              : #endif // BOOST_COROSIO_HAS_EPOLL
     185              : 
     186              : #endif // BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
        

Generated by: LCOV version 2.3