LCOV - code coverage report
Current view: top level - src/detail/select - scheduler.cpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 73.4 % 365 268
Test Date: 2026-02-04 16:37:34 Functions: 88.6 % 35 31

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2026 Steve Gerbino
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/corosio
       8              : //
       9              : 
      10              : #include <boost/corosio/detail/platform.hpp>
      11              : 
      12              : #if BOOST_COROSIO_HAS_SELECT
      13              : 
      14              : #include "src/detail/select/scheduler.hpp"
      15              : #include "src/detail/select/op.hpp"
      16              : #include "src/detail/make_err.hpp"
      17              : #include "src/detail/posix/resolver_service.hpp"
      18              : #include "src/detail/posix/signals.hpp"
      19              : 
      20              : #include <boost/corosio/detail/except.hpp>
      21              : #include <boost/corosio/detail/thread_local_ptr.hpp>
      22              : 
      23              : #include <chrono>
      24              : #include <limits>
      25              : 
      26              : #include <errno.h>
      27              : #include <fcntl.h>
      28              : #include <sys/select.h>
      29              : #include <sys/socket.h>
      30              : #include <unistd.h>
      31              : 
      32              : /*
      33              :     select Scheduler - Single Reactor Model
      34              :     =======================================
      35              : 
      36              :     This scheduler mirrors the epoll_scheduler design but uses select() instead
      37              :     of epoll for I/O multiplexing. The thread coordination strategy is identical:
      38              :     one thread becomes the "reactor" while others wait on a condition variable.
      39              : 
      40              :     Thread Model
      41              :     ------------
      42              :     - ONE thread runs select() at a time (the reactor thread)
      43              :     - OTHER threads wait on wakeup_event_ (condition variable) for handlers
      44              :     - When work is posted, exactly one waiting thread wakes via notify_one()
      45              : 
      46              :     Key Differences from epoll
      47              :     --------------------------
      48              :     - Uses self-pipe instead of eventfd for interruption (more portable)
      49              :     - fd_set rebuilding each iteration (O(n) vs O(1) for epoll)
      50              :     - FD_SETSIZE limit (~1024 fds on most systems)
      51              :     - Level-triggered only (no edge-triggered mode)
      52              : 
      53              :     Self-Pipe Pattern
      54              :     -----------------
      55              :     To interrupt a blocking select() call (e.g., when work is posted or a timer
      56              :     expires), we write a byte to pipe_fds_[1]. The read end pipe_fds_[0] is
      57              :     always in the read_fds set, so select() returns immediately. We drain the
      58              :     pipe to clear the readable state.
      59              : 
      60              :     fd-to-op Mapping
      61              :     ----------------
      62              :     We use an unordered_map<int, fd_state> to track which operations are
      63              :     registered for each fd. This allows O(1) lookup when select() returns
      64              :     ready fds. Each fd can have at most one read op and one write op registered.
      65              : */
      66              : 
      67              : namespace boost::corosio::detail {
      68              : 
      69              : namespace {
      70              : 
      71              : struct scheduler_context
      72              : {
      73              :     select_scheduler const* key;
      74              :     scheduler_context* next;
      75              : };
      76              : 
      77              : corosio::detail::thread_local_ptr<scheduler_context> context_stack;
      78              : 
      79              : struct thread_context_guard
      80              : {
      81              :     scheduler_context frame_;
      82              : 
      83           92 :     explicit thread_context_guard(
      84              :         select_scheduler const* ctx) noexcept
      85           92 :         : frame_{ctx, context_stack.get()}
      86              :     {
      87           92 :         context_stack.set(&frame_);
      88           92 :     }
      89              : 
      90           92 :     ~thread_context_guard() noexcept
      91              :     {
      92           92 :         context_stack.set(frame_.next);
      93           92 :     }
      94              : };
      95              : 
      96              : } // namespace
      97              : 
      98          120 : select_scheduler::
      99              : select_scheduler(
     100              :     capy::execution_context& ctx,
     101          120 :     int)
     102          120 :     : pipe_fds_{-1, -1}
     103          120 :     , outstanding_work_(0)
     104          120 :     , stopped_(false)
     105          120 :     , shutdown_(false)
     106          120 :     , max_fd_(-1)
     107          120 :     , reactor_running_(false)
     108          120 :     , reactor_interrupted_(false)
     109          240 :     , idle_thread_count_(0)
     110              : {
     111              :     // Create self-pipe for interrupting select()
     112          120 :     if (::pipe(pipe_fds_) < 0)
     113            0 :         detail::throw_system_error(make_err(errno), "pipe");
     114              : 
     115              :     // Set both ends to non-blocking and close-on-exec
     116          360 :     for (int i = 0; i < 2; ++i)
     117              :     {
     118          240 :         int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
     119          240 :         if (flags == -1)
     120              :         {
     121            0 :             int errn = errno;
     122            0 :             ::close(pipe_fds_[0]);
     123            0 :             ::close(pipe_fds_[1]);
     124            0 :             detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
     125              :         }
     126          240 :         if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
     127              :         {
     128            0 :             int errn = errno;
     129            0 :             ::close(pipe_fds_[0]);
     130            0 :             ::close(pipe_fds_[1]);
     131            0 :             detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
     132              :         }
     133          240 :         if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
     134              :         {
     135            0 :             int errn = errno;
     136            0 :             ::close(pipe_fds_[0]);
     137            0 :             ::close(pipe_fds_[1]);
     138            0 :             detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
     139              :         }
     140              :     }
     141              : 
     142          120 :     timer_svc_ = &get_timer_service(ctx, *this);
     143          120 :     timer_svc_->set_on_earliest_changed(
     144              :         timer_service::callback(
     145              :             this,
     146         2423 :             [](void* p) { static_cast<select_scheduler*>(p)->interrupt_reactor(); }));
     147              : 
     148              :     // Initialize resolver service
     149          120 :     get_resolver_service(ctx, *this);
     150              : 
     151              :     // Initialize signal service
     152          120 :     get_signal_service(ctx, *this);
     153              : 
     154              :     // Push task sentinel to interleave reactor runs with handler execution
     155          120 :     completed_ops_.push(&task_op_);
     156          120 : }
     157              : 
     158          240 : select_scheduler::
     159          120 : ~select_scheduler()
     160              : {
     161          120 :     if (pipe_fds_[0] >= 0)
     162          120 :         ::close(pipe_fds_[0]);
     163          120 :     if (pipe_fds_[1] >= 0)
     164          120 :         ::close(pipe_fds_[1]);
     165          240 : }
     166              : 
     167              : void
     168          120 : select_scheduler::
     169              : shutdown()
     170              : {
     171              :     {
     172          120 :         std::unique_lock lock(mutex_);
     173          120 :         shutdown_ = true;
     174              : 
     175          240 :         while (auto* h = completed_ops_.pop())
     176              :         {
     177          120 :             if (h == &task_op_)
     178          120 :                 continue;
     179            0 :             lock.unlock();
     180            0 :             h->destroy();
     181            0 :             lock.lock();
     182          120 :         }
     183          120 :     }
     184              : 
     185          120 :     outstanding_work_.store(0, std::memory_order_release);
     186              : 
     187          120 :     if (pipe_fds_[1] >= 0)
     188          120 :         interrupt_reactor();
     189              : 
     190          120 :     wakeup_event_.notify_all();
     191          120 : }
     192              : 
     193              : void
     194          168 : select_scheduler::
     195              : post(capy::coro h) const
     196              : {
     197              :     struct post_handler final
     198              :         : scheduler_op
     199              :     {
     200              :         capy::coro h_;
     201              : 
     202              :         explicit
     203          168 :         post_handler(capy::coro h)
     204          168 :             : h_(h)
     205              :         {
     206          168 :         }
     207              : 
     208          336 :         ~post_handler() = default;
     209              : 
     210          168 :         void operator()() override
     211              :         {
     212          168 :             auto h = h_;
     213          168 :             delete this;
     214          168 :             h.resume();
     215          168 :         }
     216              : 
     217            0 :         void destroy() override
     218              :         {
     219            0 :             delete this;
     220            0 :         }
     221              :     };
     222              : 
     223          168 :     auto ph = std::make_unique<post_handler>(h);
     224          168 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     225              : 
     226          168 :     std::unique_lock lock(mutex_);
     227          168 :     completed_ops_.push(ph.release());
     228          168 :     wake_one_thread_and_unlock(lock);
     229          168 : }
     230              : 
     231              : void
     232       162842 : select_scheduler::
     233              : post(scheduler_op* h) const
     234              : {
     235       162842 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     236              : 
     237       162842 :     std::unique_lock lock(mutex_);
     238       162842 :     completed_ops_.push(h);
     239       162842 :     wake_one_thread_and_unlock(lock);
     240       162842 : }
     241              : 
     242              : void
     243         2420 : select_scheduler::
     244              : on_work_started() noexcept
     245              : {
     246         2420 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     247         2420 : }
     248              : 
     249              : void
     250         2414 : select_scheduler::
     251              : on_work_finished() noexcept
     252              : {
     253         4828 :     if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
     254           18 :         stop();
     255         2414 : }
     256              : 
     257              : bool
     258       167632 : select_scheduler::
     259              : running_in_this_thread() const noexcept
     260              : {
     261       167632 :     for (auto* c = context_stack.get(); c != nullptr; c = c->next)
     262       167477 :         if (c->key == this)
     263       167477 :             return true;
     264          155 :     return false;
     265              : }
     266              : 
     267              : void
     268           18 : select_scheduler::
     269              : stop()
     270              : {
     271           18 :     bool expected = false;
     272           18 :     if (stopped_.compare_exchange_strong(expected, true,
     273              :             std::memory_order_release, std::memory_order_relaxed))
     274              :     {
     275              :         // Wake all threads so they notice stopped_ and exit
     276              :         {
     277           18 :             std::lock_guard lock(mutex_);
     278           18 :             wakeup_event_.notify_all();
     279           18 :         }
     280           18 :         interrupt_reactor();
     281              :     }
     282           18 : }
     283              : 
     284              : bool
     285            1 : select_scheduler::
     286              : stopped() const noexcept
     287              : {
     288            1 :     return stopped_.load(std::memory_order_acquire);
     289              : }
     290              : 
     291              : void
     292           34 : select_scheduler::
     293              : restart()
     294              : {
     295           34 :     stopped_.store(false, std::memory_order_release);
     296           34 : }
     297              : 
     298              : std::size_t
     299           89 : select_scheduler::
     300              : run()
     301              : {
     302           89 :     if (stopped_.load(std::memory_order_acquire))
     303            0 :         return 0;
     304              : 
     305          178 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     306              :     {
     307            0 :         stop();
     308            0 :         return 0;
     309              :     }
     310              : 
     311           89 :     thread_context_guard ctx(this);
     312              : 
     313           89 :     std::size_t n = 0;
     314       167501 :     while (do_one(-1))
     315       167412 :         if (n != (std::numeric_limits<std::size_t>::max)())
     316       167412 :             ++n;
     317           89 :     return n;
     318           89 : }
     319              : 
     320              : std::size_t
     321            0 : select_scheduler::
     322              : run_one()
     323              : {
     324            0 :     if (stopped_.load(std::memory_order_acquire))
     325            0 :         return 0;
     326              : 
     327            0 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     328              :     {
     329            0 :         stop();
     330            0 :         return 0;
     331              :     }
     332              : 
     333            0 :     thread_context_guard ctx(this);
     334            0 :     return do_one(-1);
     335            0 : }
     336              : 
     337              : std::size_t
     338            3 : select_scheduler::
     339              : wait_one(long usec)
     340              : {
     341            3 :     if (stopped_.load(std::memory_order_acquire))
     342            0 :         return 0;
     343              : 
     344            6 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     345              :     {
     346            0 :         stop();
     347            0 :         return 0;
     348              :     }
     349              : 
     350            3 :     thread_context_guard ctx(this);
     351            3 :     return do_one(usec);
     352            3 : }
     353              : 
     354              : std::size_t
     355            0 : select_scheduler::
     356              : poll()
     357              : {
     358            0 :     if (stopped_.load(std::memory_order_acquire))
     359            0 :         return 0;
     360              : 
     361            0 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     362              :     {
     363            0 :         stop();
     364            0 :         return 0;
     365              :     }
     366              : 
     367            0 :     thread_context_guard ctx(this);
     368              : 
     369            0 :     std::size_t n = 0;
     370            0 :     while (do_one(0))
     371            0 :         if (n != (std::numeric_limits<std::size_t>::max)())
     372            0 :             ++n;
     373            0 :     return n;
     374            0 : }
     375              : 
     376              : std::size_t
     377            0 : select_scheduler::
     378              : poll_one()
     379              : {
     380            0 :     if (stopped_.load(std::memory_order_acquire))
     381            0 :         return 0;
     382              : 
     383            0 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     384              :     {
     385            0 :         stop();
     386            0 :         return 0;
     387              :     }
     388              : 
     389            0 :     thread_context_guard ctx(this);
     390            0 :     return do_one(0);
     391            0 : }
     392              : 
     393              : void
     394         4523 : select_scheduler::
     395              : register_fd(int fd, select_op* op, int events) const
     396              : {
     397              :     // Validate fd is within select() limits
     398         4523 :     if (fd < 0 || fd >= FD_SETSIZE)
     399            0 :         detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
     400              : 
     401              :     {
     402         4523 :         std::lock_guard lock(mutex_);
     403              : 
     404         4523 :         auto& state = registered_fds_[fd];
     405         4523 :         if (events & event_read)
     406         2343 :             state.read_op = op;
     407         4523 :         if (events & event_write)
     408         2180 :             state.write_op = op;
     409              : 
     410         4523 :         if (fd > max_fd_)
     411          184 :             max_fd_ = fd;
     412         4523 :     }
     413              : 
     414              :     // Wake the reactor so a thread blocked in select() rebuilds its fd_sets
     415              :     // with the newly registered fd.
     416         4523 :     interrupt_reactor();
     417         4523 : }
     418              : 
     419              : void
     420         4530 : select_scheduler::
     421              : deregister_fd(int fd, int events) const
     422              : {
     423         4530 :     std::lock_guard lock(mutex_);
     424              : 
     425         4530 :     auto it = registered_fds_.find(fd);
     426         4530 :     if (it == registered_fds_.end())
     427         4411 :         return;
     428              : 
     429          119 :     if (events & event_read)
     430          119 :         it->second.read_op = nullptr;
     431          119 :     if (events & event_write)
     432            0 :         it->second.write_op = nullptr;
     433              : 
     434              :     // Remove entry if both are null
     435          119 :     if (!it->second.read_op && !it->second.write_op)
     436              :     {
     437          119 :         registered_fds_.erase(it);
     438              : 
     439              :         // Recalculate max_fd_ if needed
     440          119 :         if (fd == max_fd_)
     441              :         {
     442          118 :             max_fd_ = pipe_fds_[0];  // At minimum, the pipe read end
     443          118 :             for (auto& [registered_fd, state] : registered_fds_)
     444              :             {
     445            0 :                 if (registered_fd > max_fd_)
     446            0 :                     max_fd_ = registered_fd;
     447              :             }
     448              :         }
     449              :     }
     450         4530 : }
     451              : 
     452              : void
     453         4523 : select_scheduler::
     454              : work_started() const noexcept
     455              : {
     456         4523 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     457         4523 : }
     458              : 
     459              : void
     460       167539 : select_scheduler::
     461              : work_finished() const noexcept
     462              : {
     463       335078 :     if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
     464              :     {
     465              :         // Last work item completed - wake all threads so they can exit.
     466           72 :         std::unique_lock lock(mutex_);
     467           72 :         wakeup_event_.notify_all();
     468           72 :         if (reactor_running_ && !reactor_interrupted_)
     469              :         {
     470            0 :             reactor_interrupted_ = true;
     471            0 :             lock.unlock();
     472            0 :             interrupt_reactor();
     473              :         }
     474           72 :     }
     475       167539 : }
     476              : 
     477              : void
     478         7292 : select_scheduler::
     479              : interrupt_reactor() const
     480              : {
     481         7292 :     char byte = 1;
     482         7292 :     [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
     483         7292 : }
     484              : 
     485              : void
     486       163010 : select_scheduler::
     487              : wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
     488              : {
     489       163010 :     if (idle_thread_count_ > 0)
     490              :     {
     491              :         // Idle worker exists - wake it via condvar
     492            0 :         wakeup_event_.notify_one();
     493            0 :         lock.unlock();
     494              :     }
     495       163010 :     else if (reactor_running_ && !reactor_interrupted_)
     496              :     {
     497              :         // No idle workers but reactor is running - interrupt it
     498          208 :         reactor_interrupted_ = true;
     499          208 :         lock.unlock();
     500          208 :         interrupt_reactor();
     501              :     }
     502              :     else
     503              :     {
     504              :         // No one to wake
     505       162802 :         lock.unlock();
     506              :     }
     507       163010 : }
     508              : 
     509              : struct work_guard
     510              : {
     511              :     select_scheduler const* self;
     512       341853 :     ~work_guard() { self->work_finished(); }
     513              : };
     514              : 
     515              : long
     516         6822 : select_scheduler::
     517              : calculate_timeout(long requested_timeout_us) const
     518              : {
     519         6822 :     if (requested_timeout_us == 0)
     520            0 :         return 0;
     521              : 
     522         6822 :     auto nearest = timer_svc_->nearest_expiry();
     523         6822 :     if (nearest == timer_service::time_point::max())
     524           36 :         return requested_timeout_us;
     525              : 
     526         6786 :     auto now = std::chrono::steady_clock::now();
     527         6786 :     if (nearest <= now)
     528           38 :         return 0;
     529              : 
     530         6748 :     auto timer_timeout_us = std::chrono::duration_cast<std::chrono::microseconds>(
     531        13496 :         nearest - now).count();
     532              : 
     533         6748 :     if (requested_timeout_us < 0)
     534         6748 :         return static_cast<long>(timer_timeout_us);
     535              : 
     536            0 :     return static_cast<long>((std::min)(
     537            0 :         static_cast<long long>(requested_timeout_us),
     538            0 :         static_cast<long long>(timer_timeout_us)));
     539              : }
     540              : 
     541              : void
     542        82753 : select_scheduler::
     543              : run_reactor(std::unique_lock<std::mutex>& lock)
     544              : {
     545              :     // Calculate timeout considering timers, use 0 if interrupted
     546        82753 :     long effective_timeout_us = reactor_interrupted_ ? 0 : calculate_timeout(-1);
     547              : 
     548              :     // Build fd_sets from registered_fds_
     549              :     fd_set read_fds, write_fds, except_fds;
     550      1406801 :     FD_ZERO(&read_fds);
     551      1406801 :     FD_ZERO(&write_fds);
     552      1406801 :     FD_ZERO(&except_fds);
     553              : 
     554              :     // Always include the interrupt pipe
     555        82753 :     FD_SET(pipe_fds_[0], &read_fds);
     556        82753 :     int nfds = pipe_fds_[0];
     557              : 
     558              :     // Add registered fds
     559        91479 :     for (auto& [fd, state] : registered_fds_)
     560              :     {
     561         8726 :         if (state.read_op)
     562         6546 :             FD_SET(fd, &read_fds);
     563         8726 :         if (state.write_op)
     564              :         {
     565         2180 :             FD_SET(fd, &write_fds);
     566              :             // Also monitor for errors on connect operations
     567         2180 :             FD_SET(fd, &except_fds);
     568              :         }
     569         8726 :         if (fd > nfds)
     570         6550 :             nfds = fd;
     571              :     }
     572              : 
     573              :     // Convert timeout to timeval
     574              :     struct timeval tv;
     575        82753 :     struct timeval* tv_ptr = nullptr;
     576        82753 :     if (effective_timeout_us >= 0)
     577              :     {
     578        82717 :         tv.tv_sec = effective_timeout_us / 1000000;
     579        82717 :         tv.tv_usec = effective_timeout_us % 1000000;
     580        82717 :         tv_ptr = &tv;
     581              :     }
     582              : 
     583        82753 :     lock.unlock();
     584              : 
     585        82753 :     int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
     586        82753 :     int saved_errno = errno;
     587              : 
     588              :     // Process timers outside the lock
     589        82753 :     timer_svc_->process_expired();
     590              : 
     591        82753 :     if (ready < 0 && saved_errno != EINTR)
     592            0 :         detail::throw_system_error(make_err(saved_errno), "select");
     593              : 
     594              :     // Re-acquire lock before modifying completed_ops_
     595        82753 :     lock.lock();
     596              : 
     597              :     // Drain the interrupt pipe if readable
     598        82753 :     if (ready > 0 && FD_ISSET(pipe_fds_[0], &read_fds))
     599              :     {
     600              :         char buf[256];
     601         9126 :         while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0) {}
     602              :     }
     603              : 
     604              :     // Process I/O completions
     605        82753 :     int completions_queued = 0;
     606        82753 :     if (ready > 0)
     607              :     {
     608              :         // Iterate over registered fds (copy keys to avoid iterator invalidation)
     609         4565 :         std::vector<int> fds_to_check;
     610         4565 :         fds_to_check.reserve(registered_fds_.size());
     611        11117 :         for (auto& [fd, state] : registered_fds_)
     612         6552 :             fds_to_check.push_back(fd);
     613              : 
     614        11117 :         for (int fd : fds_to_check)
     615              :         {
     616         6552 :             auto it = registered_fds_.find(fd);
     617         6552 :             if (it == registered_fds_.end())
     618            0 :                 continue;
     619              : 
     620         6552 :             auto& state = it->second;
     621              : 
     622              :             // Check for errors (especially for connect operations)
     623         6552 :             bool has_error = FD_ISSET(fd, &except_fds);
     624              : 
     625              :             // Process read readiness
     626         6552 :             if (state.read_op && (FD_ISSET(fd, &read_fds) || has_error))
     627              :             {
     628         2224 :                 auto* op = state.read_op;
     629              :                 // Claim the op by exchanging to unregistered. Both registering and
     630              :                 // registered states mean the op is ours to complete.
     631         2224 :                 auto prev = op->registered.exchange(
     632              :                     select_registration_state::unregistered, std::memory_order_acq_rel);
     633         2224 :                 if (prev != select_registration_state::unregistered)
     634              :                 {
     635         2224 :                     state.read_op = nullptr;
     636              : 
     637         2224 :                     if (has_error)
     638              :                     {
     639            0 :                         int errn = 0;
     640            0 :                         socklen_t len = sizeof(errn);
     641            0 :                         if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
     642            0 :                             errn = errno;
     643            0 :                         if (errn == 0)
     644            0 :                             errn = EIO;
     645            0 :                         op->complete(errn, 0);
     646              :                     }
     647              :                     else
     648              :                     {
     649         2224 :                         op->perform_io();
     650              :                     }
     651              : 
     652         2224 :                     completed_ops_.push(op);
     653         2224 :                     ++completions_queued;
     654              :                 }
     655              :             }
     656              : 
     657              :             // Process write readiness
     658         6552 :             if (state.write_op && (FD_ISSET(fd, &write_fds) || has_error))
     659              :             {
     660         2180 :                 auto* op = state.write_op;
     661              :                 // Claim the op by exchanging to unregistered. Both registering and
     662              :                 // registered states mean the op is ours to complete.
     663         2180 :                 auto prev = op->registered.exchange(
     664              :                     select_registration_state::unregistered, std::memory_order_acq_rel);
     665         2180 :                 if (prev != select_registration_state::unregistered)
     666              :                 {
     667         2180 :                     state.write_op = nullptr;
     668              : 
     669         2180 :                     if (has_error)
     670              :                     {
     671            0 :                         int errn = 0;
     672            0 :                         socklen_t len = sizeof(errn);
     673            0 :                         if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
     674            0 :                             errn = errno;
     675            0 :                         if (errn == 0)
     676            0 :                             errn = EIO;
     677            0 :                         op->complete(errn, 0);
     678              :                     }
     679              :                     else
     680              :                     {
     681         2180 :                         op->perform_io();
     682              :                     }
     683              : 
     684         2180 :                     completed_ops_.push(op);
     685         2180 :                     ++completions_queued;
     686              :                 }
     687              :             }
     688              : 
     689              :             // Clean up empty entries
     690         6552 :             if (!state.read_op && !state.write_op)
     691         4404 :                 registered_fds_.erase(it);
     692              :         }
     693         4565 :     }
     694              : 
     695        82753 :     if (completions_queued > 0)
     696              :     {
     697         2228 :         if (completions_queued == 1)
     698           52 :             wakeup_event_.notify_one();
     699              :         else
     700         2176 :             wakeup_event_.notify_all();
     701              :     }
     702        82753 : }
     703              : 
     704              : std::size_t
     705       167504 : select_scheduler::
     706              : do_one(long timeout_us)
     707              : {
     708       167504 :     std::unique_lock lock(mutex_);
     709              : 
     710              :     for (;;)
     711              :     {
     712       250257 :         if (stopped_.load(std::memory_order_acquire))
     713           18 :             return 0;
     714              : 
     715       250239 :         scheduler_op* op = completed_ops_.pop();
     716              : 
     717       250239 :         if (op == &task_op_)
     718              :         {
     719        82825 :             bool more_handlers = !completed_ops_.empty();
     720              : 
     721        82825 :             if (!more_handlers)
     722              :             {
     723        13788 :                 if (outstanding_work_.load(std::memory_order_acquire) == 0)
     724              :                 {
     725           72 :                     completed_ops_.push(&task_op_);
     726           72 :                     return 0;
     727              :                 }
     728         6822 :                 if (timeout_us == 0)
     729              :                 {
     730            0 :                     completed_ops_.push(&task_op_);
     731            0 :                     return 0;
     732              :                 }
     733              :             }
     734              : 
     735        82753 :             reactor_interrupted_ = more_handlers || timeout_us == 0;
     736        82753 :             reactor_running_ = true;
     737              : 
     738        82753 :             if (more_handlers && idle_thread_count_ > 0)
     739            0 :                 wakeup_event_.notify_one();
     740              : 
     741        82753 :             run_reactor(lock);
     742              : 
     743        82753 :             reactor_running_ = false;
     744        82753 :             completed_ops_.push(&task_op_);
     745        82753 :             continue;
     746        82753 :         }
     747              : 
     748       167414 :         if (op != nullptr)
     749              :         {
     750       167414 :             lock.unlock();
     751       167414 :             work_guard g{this};
     752       167414 :             (*op)();
     753       167414 :             return 1;
     754       167414 :         }
     755              : 
     756            0 :         if (outstanding_work_.load(std::memory_order_acquire) == 0)
     757            0 :             return 0;
     758              : 
     759            0 :         if (timeout_us == 0)
     760            0 :             return 0;
     761              : 
     762            0 :         ++idle_thread_count_;
     763            0 :         if (timeout_us < 0)
     764            0 :             wakeup_event_.wait(lock);
     765              :         else
     766            0 :             wakeup_event_.wait_for(lock, std::chrono::microseconds(timeout_us));
     767            0 :         --idle_thread_count_;
     768        82753 :     }
     769       167504 : }
     770              : 
     771              : } // namespace boost::corosio::detail
     772              : 
     773              : #endif // BOOST_COROSIO_HAS_SELECT
        

Generated by: LCOV version 2.3