Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #include "src/detail/timer_service.hpp"
11 :
12 : #include <boost/corosio/detail/scheduler.hpp>
13 : #include "src/detail/intrusive.hpp"
14 : #include "src/detail/resume_coro.hpp"
15 : #include <boost/capy/error.hpp>
16 : #include <boost/capy/coro.hpp>
17 : #include <boost/capy/ex/executor_ref.hpp>
18 : #include <system_error>
19 :
20 : #include <coroutine>
21 : #include <limits>
22 : #include <mutex>
23 : #include <stdexcept>
24 : #include <stop_token>
25 : #include <vector>
26 :
27 : namespace boost::corosio::detail {
28 :
29 : class timer_service_impl;
30 :
31 : struct timer_impl
32 : : timer::timer_impl
33 : , intrusive_list<timer_impl>::node
34 : {
35 : using clock_type = std::chrono::steady_clock;
36 : using time_point = clock_type::time_point;
37 : using duration = clock_type::duration;
38 :
39 : timer_service_impl* svc_ = nullptr;
40 : time_point expiry_;
41 : std::size_t heap_index_ = (std::numeric_limits<std::size_t>::max)();
42 :
43 : // Wait operation state
44 : std::coroutine_handle<> h_;
45 : capy::executor_ref d_;
46 : std::error_code* ec_out_ = nullptr;
47 : std::stop_token token_;
48 : bool waiting_ = false;
49 :
50 129 : explicit timer_impl(timer_service_impl& svc) noexcept
51 129 : : svc_(&svc)
52 : {
53 129 : }
54 :
55 : void release() override;
56 :
57 : void wait(
58 : std::coroutine_handle<>,
59 : capy::executor_ref,
60 : std::stop_token,
61 : std::error_code*) override;
62 : };
63 :
64 : //------------------------------------------------------------------------------
65 :
66 : class timer_service_impl : public timer_service
67 : {
68 : public:
69 : using clock_type = std::chrono::steady_clock;
70 : using time_point = clock_type::time_point;
71 : using key_type = timer_service;
72 :
73 : private:
74 : struct heap_entry
75 : {
76 : time_point time_;
77 : timer_impl* timer_;
78 : };
79 :
80 : scheduler* sched_ = nullptr;
81 : mutable std::mutex mutex_;
82 : std::vector<heap_entry> heap_;
83 : intrusive_list<timer_impl> timers_;
84 : intrusive_list<timer_impl> free_list_;
85 : callback on_earliest_changed_;
86 :
87 : public:
88 304 : timer_service_impl(capy::execution_context&, scheduler& sched)
89 304 : : timer_service()
90 304 : , sched_(&sched)
91 : {
92 304 : }
93 :
94 5358 : scheduler& get_scheduler() noexcept { return *sched_; }
95 :
96 608 : ~timer_service_impl()
97 304 : {
98 608 : }
99 :
100 : timer_service_impl(timer_service_impl const&) = delete;
101 : timer_service_impl& operator=(timer_service_impl const&) = delete;
102 :
103 304 : void set_on_earliest_changed(callback cb) override
104 : {
105 304 : on_earliest_changed_ = cb;
106 304 : }
107 :
108 304 : void shutdown() override
109 : {
110 : // Cancel all waiting timers and destroy coroutine handles
111 : // This properly decrements outstanding_work_ for each waiting timer
112 304 : while (auto* impl = timers_.pop_front())
113 : {
114 0 : if (impl->waiting_)
115 : {
116 0 : impl->waiting_ = false;
117 : // Destroy the coroutine handle without resuming
118 0 : impl->h_.destroy();
119 : // Decrement work count to avoid leak
120 0 : sched_->on_work_finished();
121 : }
122 0 : delete impl;
123 0 : }
124 433 : while (auto* impl = free_list_.pop_front())
125 129 : delete impl;
126 304 : }
127 :
128 5389 : timer::timer_impl* create_impl() override
129 : {
130 5389 : std::lock_guard lock(mutex_);
131 : timer_impl* impl;
132 5389 : if (auto* p = free_list_.pop_front())
133 : {
134 5260 : impl = p;
135 5260 : impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
136 : }
137 : else
138 : {
139 129 : impl = new timer_impl(*this);
140 : }
141 5389 : timers_.push_back(impl);
142 5389 : return impl;
143 5389 : }
144 :
145 5389 : void destroy_impl(timer_impl& impl)
146 : {
147 5389 : std::lock_guard lock(mutex_);
148 5389 : remove_timer_impl(impl);
149 5389 : timers_.remove(&impl);
150 5389 : free_list_.push_back(&impl);
151 5389 : }
152 :
153 5392 : void update_timer(timer_impl& impl, time_point new_time)
154 : {
155 5392 : bool notify = false;
156 5392 : bool was_waiting = false;
157 5392 : std::coroutine_handle<> h;
158 5392 : capy::executor_ref d;
159 5392 : std::error_code* ec_out = nullptr;
160 :
161 : {
162 5392 : std::lock_guard lock(mutex_);
163 :
164 : // If currently waiting, cancel the pending wait
165 5392 : if (impl.waiting_)
166 : {
167 2 : was_waiting = true;
168 2 : impl.waiting_ = false;
169 2 : h = impl.h_;
170 2 : d = impl.d_;
171 2 : ec_out = impl.ec_out_;
172 : }
173 :
174 5392 : if (impl.heap_index_ < heap_.size())
175 : {
176 : // Already in heap, update position
177 8 : time_point old_time = heap_[impl.heap_index_].time_;
178 8 : heap_[impl.heap_index_].time_ = new_time;
179 :
180 8 : if (new_time < old_time)
181 8 : up_heap(impl.heap_index_);
182 : else
183 0 : down_heap(impl.heap_index_);
184 : }
185 : else
186 : {
187 : // Not in heap, add it
188 5384 : impl.heap_index_ = heap_.size();
189 5384 : heap_.push_back({new_time, &impl});
190 5384 : up_heap(heap_.size() - 1);
191 : }
192 :
193 : // Notify if this timer is now the earliest
194 5392 : notify = (impl.heap_index_ == 0);
195 5392 : }
196 :
197 : // Resume cancelled waiter outside lock
198 5392 : if (was_waiting)
199 : {
200 2 : if (ec_out)
201 2 : *ec_out = make_error_code(capy::error::canceled);
202 2 : resume_coro(d, h);
203 : // Call on_work_finished AFTER the coroutine resumes
204 2 : sched_->on_work_finished();
205 : }
206 :
207 5392 : if (notify)
208 5380 : on_earliest_changed_();
209 5392 : }
210 :
211 : void remove_timer(timer_impl& impl)
212 : {
213 : std::lock_guard lock(mutex_);
214 : remove_timer_impl(impl);
215 : }
216 :
217 14 : void cancel_timer(timer_impl& impl)
218 : {
219 14 : std::coroutine_handle<> h;
220 14 : capy::executor_ref d;
221 14 : std::error_code* ec_out = nullptr;
222 14 : bool was_waiting = false;
223 :
224 : {
225 14 : std::lock_guard lock(mutex_);
226 14 : remove_timer_impl(impl);
227 14 : if (impl.waiting_)
228 : {
229 4 : was_waiting = true;
230 4 : impl.waiting_ = false;
231 4 : h = impl.h_;
232 4 : d = std::move(impl.d_);
233 4 : ec_out = impl.ec_out_;
234 : }
235 14 : }
236 :
237 : // Dispatch outside lock
238 14 : if (was_waiting)
239 : {
240 4 : if (ec_out)
241 4 : *ec_out = make_error_code(capy::error::canceled);
242 4 : resume_coro(d, h);
243 : // Call on_work_finished AFTER the coroutine resumes
244 4 : sched_->on_work_finished();
245 : }
246 14 : }
247 :
248 0 : bool empty() const noexcept override
249 : {
250 0 : std::lock_guard lock(mutex_);
251 0 : return heap_.empty();
252 0 : }
253 :
254 12723 : time_point nearest_expiry() const noexcept override
255 : {
256 12723 : std::lock_guard lock(mutex_);
257 12723 : return heap_.empty() ? time_point::max() : heap_[0].time_;
258 12723 : }
259 :
260 85697 : std::size_t process_expired() override
261 : {
262 : // Collect expired timers while holding lock
263 : struct expired_entry
264 : {
265 : std::coroutine_handle<> h;
266 : capy::executor_ref d;
267 : std::error_code* ec_out;
268 : };
269 85697 : std::vector<expired_entry> expired;
270 :
271 : {
272 85697 : std::lock_guard lock(mutex_);
273 85697 : auto now = clock_type::now();
274 :
275 176752 : while (!heap_.empty() && heap_[0].time_ <= now)
276 : {
277 5358 : timer_impl* t = heap_[0].timer_;
278 5358 : remove_timer_impl(*t);
279 :
280 5358 : if (t->waiting_)
281 : {
282 5352 : t->waiting_ = false;
283 5352 : expired.push_back({t->h_, std::move(t->d_), t->ec_out_});
284 : }
285 : // If not waiting, timer is removed but not dispatched -
286 : // wait() will handle this by checking expiry
287 : }
288 85697 : }
289 :
290 : // Dispatch outside lock
291 91049 : for (auto& e : expired)
292 : {
293 5352 : if (e.ec_out)
294 5352 : *e.ec_out = {};
295 5352 : resume_coro(e.d, e.h);
296 : // Call on_work_finished AFTER the coroutine resumes, so it has a
297 : // chance to add new work before we potentially trigger stop()
298 5352 : sched_->on_work_finished();
299 : }
300 :
301 171394 : return expired.size();
302 85697 : }
303 :
304 : private:
305 10761 : void remove_timer_impl(timer_impl& impl)
306 : {
307 10761 : std::size_t index = impl.heap_index_;
308 10761 : if (index >= heap_.size())
309 5377 : return; // Not in heap
310 :
311 5384 : if (index == heap_.size() - 1)
312 : {
313 : // Last element, just pop
314 109 : impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
315 109 : heap_.pop_back();
316 : }
317 : else
318 : {
319 : // Swap with last and reheapify
320 5275 : swap_heap(index, heap_.size() - 1);
321 5275 : impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
322 5275 : heap_.pop_back();
323 :
324 5275 : if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
325 0 : up_heap(index);
326 : else
327 5275 : down_heap(index);
328 : }
329 : }
330 :
331 5392 : void up_heap(std::size_t index)
332 : {
333 10655 : while (index > 0)
334 : {
335 5275 : std::size_t parent = (index - 1) / 2;
336 5275 : if (!(heap_[index].time_ < heap_[parent].time_))
337 12 : break;
338 5263 : swap_heap(index, parent);
339 5263 : index = parent;
340 : }
341 5392 : }
342 :
343 5275 : void down_heap(std::size_t index)
344 : {
345 5275 : std::size_t child = index * 2 + 1;
346 5275 : while (child < heap_.size())
347 : {
348 2 : std::size_t min_child = (child + 1 == heap_.size() ||
349 0 : heap_[child].time_ < heap_[child + 1].time_)
350 2 : ? child : child + 1;
351 :
352 2 : if (heap_[index].time_ < heap_[min_child].time_)
353 2 : break;
354 :
355 0 : swap_heap(index, min_child);
356 0 : index = min_child;
357 0 : child = index * 2 + 1;
358 : }
359 5275 : }
360 :
361 10538 : void swap_heap(std::size_t i1, std::size_t i2)
362 : {
363 10538 : heap_entry tmp = heap_[i1];
364 10538 : heap_[i1] = heap_[i2];
365 10538 : heap_[i2] = tmp;
366 10538 : heap_[i1].timer_->heap_index_ = i1;
367 10538 : heap_[i2].timer_->heap_index_ = i2;
368 10538 : }
369 : };
370 :
371 : //------------------------------------------------------------------------------
372 :
373 : void
374 5389 : timer_impl::
375 : release()
376 : {
377 5389 : svc_->destroy_impl(*this);
378 5389 : }
379 :
380 : void
381 5364 : timer_impl::
382 : wait(
383 : std::coroutine_handle<> h,
384 : capy::executor_ref d,
385 : std::stop_token token,
386 : std::error_code* ec)
387 : {
388 : // Check if timer already expired (not in heap anymore)
389 5364 : bool already_expired = (heap_index_ == (std::numeric_limits<std::size_t>::max)());
390 :
391 5364 : if (already_expired)
392 : {
393 : // Timer already expired - dispatch immediately
394 6 : if (ec)
395 6 : *ec = {};
396 : // Note: no work tracking needed - we dispatch synchronously
397 6 : resume_coro(d, h);
398 6 : return;
399 : }
400 :
401 5358 : h_ = h;
402 5358 : d_ = std::move(d);
403 5358 : token_ = std::move(token);
404 5358 : ec_out_ = ec;
405 5358 : waiting_ = true;
406 5358 : svc_->get_scheduler().on_work_started();
407 : }
408 :
409 : //------------------------------------------------------------------------------
410 : //
411 : // Extern free functions called from timer.cpp
412 : //
413 : //------------------------------------------------------------------------------
414 :
415 : timer::timer_impl*
416 5389 : timer_service_create(capy::execution_context& ctx)
417 : {
418 5389 : auto* svc = ctx.find_service<timer_service>();
419 5389 : if (!svc)
420 : {
421 : // Timer service not yet created - this happens if io_context
422 : // hasn't been constructed yet, or if the scheduler didn't
423 : // initialize the timer service
424 0 : throw std::runtime_error("timer_service not found");
425 : }
426 5389 : return svc->create_impl();
427 : }
428 :
429 : void
430 5389 : timer_service_destroy(timer::timer_impl& base) noexcept
431 : {
432 5389 : static_cast<timer_impl&>(base).release();
433 5389 : }
434 :
435 : timer::time_point
436 28 : timer_service_expiry(timer::timer_impl& base) noexcept
437 : {
438 28 : return static_cast<timer_impl&>(base).expiry_;
439 : }
440 :
441 : void
442 14 : timer_service_expires_at(timer::timer_impl& base, timer::time_point t)
443 : {
444 14 : auto& impl = static_cast<timer_impl&>(base);
445 14 : impl.expiry_ = t;
446 14 : impl.svc_->update_timer(impl, t);
447 14 : }
448 :
449 : void
450 5378 : timer_service_expires_after(timer::timer_impl& base, timer::duration d)
451 : {
452 5378 : auto& impl = static_cast<timer_impl&>(base);
453 5378 : impl.expiry_ = timer::clock_type::now() + d;
454 5378 : impl.svc_->update_timer(impl, impl.expiry_);
455 5378 : }
456 :
457 : void
458 14 : timer_service_cancel(timer::timer_impl& base) noexcept
459 : {
460 14 : auto& impl = static_cast<timer_impl&>(base);
461 14 : impl.svc_->cancel_timer(impl);
462 14 : }
463 :
464 : timer_service&
465 304 : get_timer_service(capy::execution_context& ctx, scheduler& sched)
466 : {
467 304 : return ctx.make_service<timer_service_impl>(sched);
468 : }
469 :
470 : } // namespace boost::corosio::detail
|