LCOV - code coverage report
Current view: top level - src/detail - timer_service.cpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 91.5 % 200 183
Test Date: 2026-02-04 16:37:34 Functions: 96.3 % 27 26

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2026 Steve Gerbino
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/corosio
       8              : //
       9              : 
      10              : #include "src/detail/timer_service.hpp"
      11              : 
      12              : #include <boost/corosio/detail/scheduler.hpp>
      13              : #include "src/detail/intrusive.hpp"
      14              : #include "src/detail/resume_coro.hpp"
      15              : #include <boost/capy/error.hpp>
      16              : #include <boost/capy/coro.hpp>
      17              : #include <boost/capy/ex/executor_ref.hpp>
      18              : #include <system_error>
      19              : 
      20              : #include <coroutine>
      21              : #include <limits>
      22              : #include <mutex>
      23              : #include <stdexcept>
      24              : #include <stop_token>
      25              : #include <vector>
      26              : 
      27              : namespace boost::corosio::detail {
      28              : 
      29              : class timer_service_impl;
      30              : 
      31              : struct timer_impl
      32              :     : timer::timer_impl
      33              :     , intrusive_list<timer_impl>::node
      34              : {
      35              :     using clock_type = std::chrono::steady_clock;
      36              :     using time_point = clock_type::time_point;
      37              :     using duration = clock_type::duration;
      38              : 
      39              :     timer_service_impl* svc_ = nullptr;
      40              :     time_point expiry_;
      41              :     std::size_t heap_index_ = (std::numeric_limits<std::size_t>::max)();
      42              : 
      43              :     // Wait operation state
      44              :     std::coroutine_handle<> h_;
      45              :     capy::executor_ref d_;
      46              :     std::error_code* ec_out_ = nullptr;
      47              :     std::stop_token token_;
      48              :     bool waiting_ = false;
      49              : 
      50          129 :     explicit timer_impl(timer_service_impl& svc) noexcept
      51          129 :         : svc_(&svc)
      52              :     {
      53          129 :     }
      54              : 
      55              :     void release() override;
      56              : 
      57              :     void wait(
      58              :         std::coroutine_handle<>,
      59              :         capy::executor_ref,
      60              :         std::stop_token,
      61              :         std::error_code*) override;
      62              : };
      63              : 
      64              : //------------------------------------------------------------------------------
      65              : 
      66              : class timer_service_impl : public timer_service
      67              : {
      68              : public:
      69              :     using clock_type = std::chrono::steady_clock;
      70              :     using time_point = clock_type::time_point;
      71              :     using key_type = timer_service;
      72              : 
      73              : private:
      74              :     struct heap_entry
      75              :     {
      76              :         time_point time_;
      77              :         timer_impl* timer_;
      78              :     };
      79              : 
      80              :     scheduler* sched_ = nullptr;
      81              :     mutable std::mutex mutex_;
      82              :     std::vector<heap_entry> heap_;
      83              :     intrusive_list<timer_impl> timers_;
      84              :     intrusive_list<timer_impl> free_list_;
      85              :     callback on_earliest_changed_;
      86              : 
      87              : public:
      88          304 :     timer_service_impl(capy::execution_context&, scheduler& sched)
      89          304 :         : timer_service()
      90          304 :         , sched_(&sched)
      91              :     {
      92          304 :     }
      93              : 
      94         5358 :     scheduler& get_scheduler() noexcept { return *sched_; }
      95              : 
      96          608 :     ~timer_service_impl()
      97          304 :     {
      98          608 :     }
      99              : 
     100              :     timer_service_impl(timer_service_impl const&) = delete;
     101              :     timer_service_impl& operator=(timer_service_impl const&) = delete;
     102              : 
     103          304 :     void set_on_earliest_changed(callback cb) override
     104              :     {
     105          304 :         on_earliest_changed_ = cb;
     106          304 :     }
     107              : 
     108          304 :     void shutdown() override
     109              :     {
     110              :         // Cancel all waiting timers and destroy coroutine handles
     111              :         // This properly decrements outstanding_work_ for each waiting timer
     112          304 :         while (auto* impl = timers_.pop_front())
     113              :         {
     114            0 :             if (impl->waiting_)
     115              :             {
     116            0 :                 impl->waiting_ = false;
     117              :                 // Destroy the coroutine handle without resuming
     118            0 :                 impl->h_.destroy();
     119              :                 // Decrement work count to avoid leak
     120            0 :                 sched_->on_work_finished();
     121              :             }
     122            0 :             delete impl;
     123            0 :         }
     124          433 :         while (auto* impl = free_list_.pop_front())
     125          129 :             delete impl;
     126          304 :     }
     127              : 
     128         5389 :     timer::timer_impl* create_impl() override
     129              :     {
     130         5389 :         std::lock_guard lock(mutex_);
     131              :         timer_impl* impl;
     132         5389 :         if (auto* p = free_list_.pop_front())
     133              :         {
     134         5260 :             impl = p;
     135         5260 :             impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
     136              :         }
     137              :         else
     138              :         {
     139          129 :             impl = new timer_impl(*this);
     140              :         }
     141         5389 :         timers_.push_back(impl);
     142         5389 :         return impl;
     143         5389 :     }
     144              : 
     145         5389 :     void destroy_impl(timer_impl& impl)
     146              :     {
     147         5389 :         std::lock_guard lock(mutex_);
     148         5389 :         remove_timer_impl(impl);
     149         5389 :         timers_.remove(&impl);
     150         5389 :         free_list_.push_back(&impl);
     151         5389 :     }
     152              : 
     153         5392 :     void update_timer(timer_impl& impl, time_point new_time)
     154              :     {
     155         5392 :         bool notify = false;
     156         5392 :         bool was_waiting = false;
     157         5392 :         std::coroutine_handle<> h;
     158         5392 :         capy::executor_ref d;
     159         5392 :         std::error_code* ec_out = nullptr;
     160              : 
     161              :         {
     162         5392 :             std::lock_guard lock(mutex_);
     163              : 
     164              :             // If currently waiting, cancel the pending wait
     165         5392 :             if (impl.waiting_)
     166              :             {
     167            2 :                 was_waiting = true;
     168            2 :                 impl.waiting_ = false;
     169            2 :                 h = impl.h_;
     170            2 :                 d = impl.d_;
     171            2 :                 ec_out = impl.ec_out_;
     172              :             }
     173              : 
     174         5392 :             if (impl.heap_index_ < heap_.size())
     175              :             {
     176              :                 // Already in heap, update position
     177            8 :                 time_point old_time = heap_[impl.heap_index_].time_;
     178            8 :                 heap_[impl.heap_index_].time_ = new_time;
     179              : 
     180            8 :                 if (new_time < old_time)
     181            8 :                     up_heap(impl.heap_index_);
     182              :                 else
     183            0 :                     down_heap(impl.heap_index_);
     184              :             }
     185              :             else
     186              :             {
     187              :                 // Not in heap, add it
     188         5384 :                 impl.heap_index_ = heap_.size();
     189         5384 :                 heap_.push_back({new_time, &impl});
     190         5384 :                 up_heap(heap_.size() - 1);
     191              :             }
     192              : 
     193              :             // Notify if this timer is now the earliest
     194         5392 :             notify = (impl.heap_index_ == 0);
     195         5392 :         }
     196              : 
     197              :         // Resume cancelled waiter outside lock
     198         5392 :         if (was_waiting)
     199              :         {
     200            2 :             if (ec_out)
     201            2 :                 *ec_out = make_error_code(capy::error::canceled);
     202            2 :             resume_coro(d, h);
     203              :             // Call on_work_finished AFTER the coroutine resumes
     204            2 :             sched_->on_work_finished();
     205              :         }
     206              : 
     207         5392 :         if (notify)
     208         5380 :             on_earliest_changed_();
     209         5392 :     }
     210              : 
     211              :     void remove_timer(timer_impl& impl)
     212              :     {
     213              :         std::lock_guard lock(mutex_);
     214              :         remove_timer_impl(impl);
     215              :     }
     216              : 
     217           14 :     void cancel_timer(timer_impl& impl)
     218              :     {
     219           14 :         std::coroutine_handle<> h;
     220           14 :         capy::executor_ref d;
     221           14 :         std::error_code* ec_out = nullptr;
     222           14 :         bool was_waiting = false;
     223              : 
     224              :         {
     225           14 :             std::lock_guard lock(mutex_);
     226           14 :             remove_timer_impl(impl);
     227           14 :             if (impl.waiting_)
     228              :             {
     229            4 :                 was_waiting = true;
     230            4 :                 impl.waiting_ = false;
     231            4 :                 h = impl.h_;
     232            4 :                 d = std::move(impl.d_);
     233            4 :                 ec_out = impl.ec_out_;
     234              :             }
     235           14 :         }
     236              : 
     237              :         // Dispatch outside lock
     238           14 :         if (was_waiting)
     239              :         {
     240            4 :             if (ec_out)
     241            4 :                 *ec_out = make_error_code(capy::error::canceled);
     242            4 :             resume_coro(d, h);
     243              :             // Call on_work_finished AFTER the coroutine resumes
     244            4 :             sched_->on_work_finished();
     245              :         }
     246           14 :     }
     247              : 
     248            0 :     bool empty() const noexcept override
     249              :     {
     250            0 :         std::lock_guard lock(mutex_);
     251            0 :         return heap_.empty();
     252            0 :     }
     253              : 
     254        12723 :     time_point nearest_expiry() const noexcept override
     255              :     {
     256        12723 :         std::lock_guard lock(mutex_);
     257        12723 :         return heap_.empty() ? time_point::max() : heap_[0].time_;
     258        12723 :     }
     259              : 
     260        85697 :     std::size_t process_expired() override
     261              :     {
     262              :         // Collect expired timers while holding lock
     263              :         struct expired_entry
     264              :         {
     265              :             std::coroutine_handle<> h;
     266              :             capy::executor_ref d;
     267              :             std::error_code* ec_out;
     268              :         };
     269        85697 :         std::vector<expired_entry> expired;
     270              : 
     271              :         {
     272        85697 :             std::lock_guard lock(mutex_);
     273        85697 :             auto now = clock_type::now();
     274              : 
     275       176752 :             while (!heap_.empty() && heap_[0].time_ <= now)
     276              :             {
     277         5358 :                 timer_impl* t = heap_[0].timer_;
     278         5358 :                 remove_timer_impl(*t);
     279              : 
     280         5358 :                 if (t->waiting_)
     281              :                 {
     282         5352 :                     t->waiting_ = false;
     283         5352 :                     expired.push_back({t->h_, std::move(t->d_), t->ec_out_});
     284              :                 }
     285              :                 // If not waiting, timer is removed but not dispatched -
     286              :                 // wait() will handle this by checking expiry
     287              :             }
     288        85697 :         }
     289              : 
     290              :         // Dispatch outside lock
     291        91049 :         for (auto& e : expired)
     292              :         {
     293         5352 :             if (e.ec_out)
     294         5352 :                 *e.ec_out = {};
     295         5352 :             resume_coro(e.d, e.h);
     296              :             // Call on_work_finished AFTER the coroutine resumes, so it has a
     297              :             // chance to add new work before we potentially trigger stop()
     298         5352 :             sched_->on_work_finished();
     299              :         }
     300              : 
     301       171394 :         return expired.size();
     302        85697 :     }
     303              : 
     304              : private:
     305        10761 :     void remove_timer_impl(timer_impl& impl)
     306              :     {
     307        10761 :         std::size_t index = impl.heap_index_;
     308        10761 :         if (index >= heap_.size())
     309         5377 :             return; // Not in heap
     310              : 
     311         5384 :         if (index == heap_.size() - 1)
     312              :         {
     313              :             // Last element, just pop
     314          109 :             impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
     315          109 :             heap_.pop_back();
     316              :         }
     317              :         else
     318              :         {
     319              :             // Swap with last and reheapify
     320         5275 :             swap_heap(index, heap_.size() - 1);
     321         5275 :             impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
     322         5275 :             heap_.pop_back();
     323              : 
     324         5275 :             if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
     325            0 :                 up_heap(index);
     326              :             else
     327         5275 :                 down_heap(index);
     328              :         }
     329              :     }
     330              : 
     331         5392 :     void up_heap(std::size_t index)
     332              :     {
     333        10655 :         while (index > 0)
     334              :         {
     335         5275 :             std::size_t parent = (index - 1) / 2;
     336         5275 :             if (!(heap_[index].time_ < heap_[parent].time_))
     337           12 :                 break;
     338         5263 :             swap_heap(index, parent);
     339         5263 :             index = parent;
     340              :         }
     341         5392 :     }
     342              : 
     343         5275 :     void down_heap(std::size_t index)
     344              :     {
     345         5275 :         std::size_t child = index * 2 + 1;
     346         5275 :         while (child < heap_.size())
     347              :         {
     348            2 :             std::size_t min_child = (child + 1 == heap_.size() ||
     349            0 :                 heap_[child].time_ < heap_[child + 1].time_)
     350            2 :                 ? child : child + 1;
     351              : 
     352            2 :             if (heap_[index].time_ < heap_[min_child].time_)
     353            2 :                 break;
     354              : 
     355            0 :             swap_heap(index, min_child);
     356            0 :             index = min_child;
     357            0 :             child = index * 2 + 1;
     358              :         }
     359         5275 :     }
     360              : 
     361        10538 :     void swap_heap(std::size_t i1, std::size_t i2)
     362              :     {
     363        10538 :         heap_entry tmp = heap_[i1];
     364        10538 :         heap_[i1] = heap_[i2];
     365        10538 :         heap_[i2] = tmp;
     366        10538 :         heap_[i1].timer_->heap_index_ = i1;
     367        10538 :         heap_[i2].timer_->heap_index_ = i2;
     368        10538 :     }
     369              : };
     370              : 
     371              : //------------------------------------------------------------------------------
     372              : 
     373              : void
     374         5389 : timer_impl::
     375              : release()
     376              : {
     377         5389 :     svc_->destroy_impl(*this);
     378         5389 : }
     379              : 
     380              : void
     381         5364 : timer_impl::
     382              : wait(
     383              :     std::coroutine_handle<> h,
     384              :     capy::executor_ref d,
     385              :     std::stop_token token,
     386              :     std::error_code* ec)
     387              : {
     388              :     // Check if timer already expired (not in heap anymore)
     389         5364 :     bool already_expired = (heap_index_ == (std::numeric_limits<std::size_t>::max)());
     390              : 
     391         5364 :     if (already_expired)
     392              :     {
     393              :         // Timer already expired - dispatch immediately
     394            6 :         if (ec)
     395            6 :             *ec = {};
     396              :         // Note: no work tracking needed - we dispatch synchronously
     397            6 :         resume_coro(d, h);
     398            6 :         return;
     399              :     }
     400              : 
     401         5358 :     h_ = h;
     402         5358 :     d_ = std::move(d);
     403         5358 :     token_ = std::move(token);
     404         5358 :     ec_out_ = ec;
     405         5358 :     waiting_ = true;
     406         5358 :     svc_->get_scheduler().on_work_started();
     407              : }
     408              : 
     409              : //------------------------------------------------------------------------------
     410              : //
     411              : // Extern free functions called from timer.cpp
     412              : //
     413              : //------------------------------------------------------------------------------
     414              : 
     415              : timer::timer_impl*
     416         5389 : timer_service_create(capy::execution_context& ctx)
     417              : {
     418         5389 :     auto* svc = ctx.find_service<timer_service>();
     419         5389 :     if (!svc)
     420              :     {
     421              :         // Timer service not yet created - this happens if io_context
     422              :         // hasn't been constructed yet, or if the scheduler didn't
     423              :         // initialize the timer service
     424            0 :         throw std::runtime_error("timer_service not found");
     425              :     }
     426         5389 :     return svc->create_impl();
     427              : }
     428              : 
     429              : void
     430         5389 : timer_service_destroy(timer::timer_impl& base) noexcept
     431              : {
     432         5389 :     static_cast<timer_impl&>(base).release();
     433         5389 : }
     434              : 
     435              : timer::time_point
     436           28 : timer_service_expiry(timer::timer_impl& base) noexcept
     437              : {
     438           28 :     return static_cast<timer_impl&>(base).expiry_;
     439              : }
     440              : 
     441              : void
     442           14 : timer_service_expires_at(timer::timer_impl& base, timer::time_point t)
     443              : {
     444           14 :     auto& impl = static_cast<timer_impl&>(base);
     445           14 :     impl.expiry_ = t;
     446           14 :     impl.svc_->update_timer(impl, t);
     447           14 : }
     448              : 
     449              : void
     450         5378 : timer_service_expires_after(timer::timer_impl& base, timer::duration d)
     451              : {
     452         5378 :     auto& impl = static_cast<timer_impl&>(base);
     453         5378 :     impl.expiry_ = timer::clock_type::now() + d;
     454         5378 :     impl.svc_->update_timer(impl, impl.expiry_);
     455         5378 : }
     456              : 
     457              : void
     458           14 : timer_service_cancel(timer::timer_impl& base) noexcept
     459              : {
     460           14 :     auto& impl = static_cast<timer_impl&>(base);
     461           14 :     impl.svc_->cancel_timer(impl);
     462           14 : }
     463              : 
     464              : timer_service&
     465          304 : get_timer_service(capy::execution_context& ctx, scheduler& sched)
     466              : {
     467          304 :     return ctx.make_service<timer_service_impl>(sched);
     468              : }
     469              : 
     470              : } // namespace boost::corosio::detail
        

Generated by: LCOV version 2.3