libs/corosio/src/corosio/src/detail/timer_service.cpp

91.5% Lines (183/200) 96.2% Functions (25/26) 76.6% Branches (72/94)
libs/corosio/src/corosio/src/detail/timer_service.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include "src/detail/timer_service.hpp"
11
12 #include <boost/corosio/detail/scheduler.hpp>
13 #include "src/detail/intrusive.hpp"
14 #include "src/detail/resume_coro.hpp"
15 #include <boost/capy/error.hpp>
16 #include <boost/capy/coro.hpp>
17 #include <boost/capy/ex/executor_ref.hpp>
18 #include <system_error>
19
20 #include <coroutine>
21 #include <limits>
22 #include <mutex>
23 #include <stdexcept>
24 #include <stop_token>
25 #include <vector>
26
27 namespace boost::corosio::detail {
28
29 class timer_service_impl;
30
31 struct timer_impl
32 : timer::timer_impl
33 , intrusive_list<timer_impl>::node
34 {
35 using clock_type = std::chrono::steady_clock;
36 using time_point = clock_type::time_point;
37 using duration = clock_type::duration;
38
39 timer_service_impl* svc_ = nullptr;
40 time_point expiry_;
41 std::size_t heap_index_ = (std::numeric_limits<std::size_t>::max)();
42
43 // Wait operation state
44 std::coroutine_handle<> h_;
45 capy::executor_ref d_;
46 std::error_code* ec_out_ = nullptr;
47 std::stop_token token_;
48 bool waiting_ = false;
49
50 129 explicit timer_impl(timer_service_impl& svc) noexcept
51 129 : svc_(&svc)
52 {
53 129 }
54
55 void release() override;
56
57 void wait(
58 std::coroutine_handle<>,
59 capy::executor_ref,
60 std::stop_token,
61 std::error_code*) override;
62 };
63
64 //------------------------------------------------------------------------------
65
66 class timer_service_impl : public timer_service
67 {
68 public:
69 using clock_type = std::chrono::steady_clock;
70 using time_point = clock_type::time_point;
71 using key_type = timer_service;
72
73 private:
74 struct heap_entry
75 {
76 time_point time_;
77 timer_impl* timer_;
78 };
79
80 scheduler* sched_ = nullptr;
81 mutable std::mutex mutex_;
82 std::vector<heap_entry> heap_;
83 intrusive_list<timer_impl> timers_;
84 intrusive_list<timer_impl> free_list_;
85 callback on_earliest_changed_;
86
87 public:
88 304 timer_service_impl(capy::execution_context&, scheduler& sched)
89 304 : timer_service()
90 304 , sched_(&sched)
91 {
92 304 }
93
94 5358 scheduler& get_scheduler() noexcept { return *sched_; }
95
96 608 ~timer_service_impl()
97 304 {
98 608 }
99
100 timer_service_impl(timer_service_impl const&) = delete;
101 timer_service_impl& operator=(timer_service_impl const&) = delete;
102
103 304 void set_on_earliest_changed(callback cb) override
104 {
105 304 on_earliest_changed_ = cb;
106 304 }
107
108 304 void shutdown() override
109 {
110 // Cancel all waiting timers and destroy coroutine handles
111 // This properly decrements outstanding_work_ for each waiting timer
112
1/2
✓ Branch 1 taken 304 times.
✗ Branch 2 not taken.
304 while (auto* impl = timers_.pop_front())
113 {
114 if (impl->waiting_)
115 {
116 impl->waiting_ = false;
117 // Destroy the coroutine handle without resuming
118 impl->h_.destroy();
119 // Decrement work count to avoid leak
120 sched_->on_work_finished();
121 }
122 delete impl;
123 }
124
2/2
✓ Branch 1 taken 129 times.
✓ Branch 2 taken 304 times.
433 while (auto* impl = free_list_.pop_front())
125
1/2
✓ Branch 0 taken 129 times.
✗ Branch 1 not taken.
129 delete impl;
126 304 }
127
128 5389 timer::timer_impl* create_impl() override
129 {
130
1/1
✓ Branch 1 taken 5389 times.
5389 std::lock_guard lock(mutex_);
131 timer_impl* impl;
132
2/2
✓ Branch 1 taken 5260 times.
✓ Branch 2 taken 129 times.
5389 if (auto* p = free_list_.pop_front())
133 {
134 5260 impl = p;
135 5260 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
136 }
137 else
138 {
139
1/1
✓ Branch 1 taken 129 times.
129 impl = new timer_impl(*this);
140 }
141 5389 timers_.push_back(impl);
142 5389 return impl;
143 5389 }
144
145 5389 void destroy_impl(timer_impl& impl)
146 {
147
1/1
✓ Branch 1 taken 5389 times.
5389 std::lock_guard lock(mutex_);
148
1/1
✓ Branch 1 taken 5389 times.
5389 remove_timer_impl(impl);
149 5389 timers_.remove(&impl);
150 5389 free_list_.push_back(&impl);
151 5389 }
152
153 5392 void update_timer(timer_impl& impl, time_point new_time)
154 {
155 5392 bool notify = false;
156 5392 bool was_waiting = false;
157 5392 std::coroutine_handle<> h;
158 5392 capy::executor_ref d;
159 5392 std::error_code* ec_out = nullptr;
160
161 {
162
1/1
✓ Branch 1 taken 5392 times.
5392 std::lock_guard lock(mutex_);
163
164 // If currently waiting, cancel the pending wait
165
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 5390 times.
5392 if (impl.waiting_)
166 {
167 2 was_waiting = true;
168 2 impl.waiting_ = false;
169 2 h = impl.h_;
170 2 d = impl.d_;
171 2 ec_out = impl.ec_out_;
172 }
173
174
2/2
✓ Branch 1 taken 8 times.
✓ Branch 2 taken 5384 times.
5392 if (impl.heap_index_ < heap_.size())
175 {
176 // Already in heap, update position
177 8 time_point old_time = heap_[impl.heap_index_].time_;
178 8 heap_[impl.heap_index_].time_ = new_time;
179
180
2/3
✓ Branch 1 taken 8 times.
✓ Branch 4 taken 8 times.
✗ Branch 5 not taken.
8 if (new_time < old_time)
181
1/1
✓ Branch 1 taken 8 times.
8 up_heap(impl.heap_index_);
182 else
183 down_heap(impl.heap_index_);
184 }
185 else
186 {
187 // Not in heap, add it
188 5384 impl.heap_index_ = heap_.size();
189
1/1
✓ Branch 1 taken 5384 times.
5384 heap_.push_back({new_time, &impl});
190
1/1
✓ Branch 2 taken 5384 times.
5384 up_heap(heap_.size() - 1);
191 }
192
193 // Notify if this timer is now the earliest
194 5392 notify = (impl.heap_index_ == 0);
195 5392 }
196
197 // Resume cancelled waiter outside lock
198
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 5390 times.
5392 if (was_waiting)
199 {
200
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 if (ec_out)
201 2 *ec_out = make_error_code(capy::error::canceled);
202
1/1
✓ Branch 1 taken 2 times.
2 resume_coro(d, h);
203 // Call on_work_finished AFTER the coroutine resumes
204 2 sched_->on_work_finished();
205 }
206
207
2/2
✓ Branch 0 taken 5380 times.
✓ Branch 1 taken 12 times.
5392 if (notify)
208
1/1
✓ Branch 1 taken 5380 times.
5380 on_earliest_changed_();
209 5392 }
210
211 void remove_timer(timer_impl& impl)
212 {
213 std::lock_guard lock(mutex_);
214 remove_timer_impl(impl);
215 }
216
217 14 void cancel_timer(timer_impl& impl)
218 {
219 14 std::coroutine_handle<> h;
220 14 capy::executor_ref d;
221 14 std::error_code* ec_out = nullptr;
222 14 bool was_waiting = false;
223
224 {
225
1/1
✓ Branch 1 taken 14 times.
14 std::lock_guard lock(mutex_);
226
1/1
✓ Branch 1 taken 14 times.
14 remove_timer_impl(impl);
227
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 10 times.
14 if (impl.waiting_)
228 {
229 4 was_waiting = true;
230 4 impl.waiting_ = false;
231 4 h = impl.h_;
232 4 d = std::move(impl.d_);
233 4 ec_out = impl.ec_out_;
234 }
235 14 }
236
237 // Dispatch outside lock
238
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 10 times.
14 if (was_waiting)
239 {
240
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 if (ec_out)
241 4 *ec_out = make_error_code(capy::error::canceled);
242
1/1
✓ Branch 1 taken 4 times.
4 resume_coro(d, h);
243 // Call on_work_finished AFTER the coroutine resumes
244 4 sched_->on_work_finished();
245 }
246 14 }
247
248 bool empty() const noexcept override
249 {
250 std::lock_guard lock(mutex_);
251 return heap_.empty();
252 }
253
254 12723 time_point nearest_expiry() const noexcept override
255 {
256 12723 std::lock_guard lock(mutex_);
257
2/2
✓ Branch 1 taken 76 times.
✓ Branch 2 taken 12647 times.
12723 return heap_.empty() ? time_point::max() : heap_[0].time_;
258 12723 }
259
260 85697 std::size_t process_expired() override
261 {
262 // Collect expired timers while holding lock
263 struct expired_entry
264 {
265 std::coroutine_handle<> h;
266 capy::executor_ref d;
267 std::error_code* ec_out;
268 };
269 85697 std::vector<expired_entry> expired;
270
271 {
272
1/1
✓ Branch 1 taken 85697 times.
85697 std::lock_guard lock(mutex_);
273 85697 auto now = clock_type::now();
274
275
7/7
✓ Branch 1 taken 90784 times.
✓ Branch 2 taken 271 times.
✓ Branch 5 taken 90784 times.
✓ Branch 8 taken 5358 times.
✓ Branch 9 taken 85426 times.
✓ Branch 10 taken 5358 times.
✓ Branch 11 taken 85697 times.
176752 while (!heap_.empty() && heap_[0].time_ <= now)
276 {
277 5358 timer_impl* t = heap_[0].timer_;
278
1/1
✓ Branch 1 taken 5358 times.
5358 remove_timer_impl(*t);
279
280
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 5352 times.
5358 if (t->waiting_)
281 {
282 5352 t->waiting_ = false;
283
1/1
✓ Branch 2 taken 5352 times.
5352 expired.push_back({t->h_, std::move(t->d_), t->ec_out_});
284 }
285 // If not waiting, timer is removed but not dispatched -
286 // wait() will handle this by checking expiry
287 }
288 85697 }
289
290 // Dispatch outside lock
291
2/2
✓ Branch 5 taken 5352 times.
✓ Branch 6 taken 85697 times.
91049 for (auto& e : expired)
292 {
293
1/2
✓ Branch 0 taken 5352 times.
✗ Branch 1 not taken.
5352 if (e.ec_out)
294 5352 *e.ec_out = {};
295
1/1
✓ Branch 1 taken 5352 times.
5352 resume_coro(e.d, e.h);
296 // Call on_work_finished AFTER the coroutine resumes, so it has a
297 // chance to add new work before we potentially trigger stop()
298 5352 sched_->on_work_finished();
299 }
300
301 171394 return expired.size();
302 85697 }
303
304 private:
305 10761 void remove_timer_impl(timer_impl& impl)
306 {
307 10761 std::size_t index = impl.heap_index_;
308
2/2
✓ Branch 1 taken 5377 times.
✓ Branch 2 taken 5384 times.
10761 if (index >= heap_.size())
309 5377 return; // Not in heap
310
311
2/2
✓ Branch 1 taken 109 times.
✓ Branch 2 taken 5275 times.
5384 if (index == heap_.size() - 1)
312 {
313 // Last element, just pop
314 109 impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
315 109 heap_.pop_back();
316 }
317 else
318 {
319 // Swap with last and reheapify
320 5275 swap_heap(index, heap_.size() - 1);
321 5275 impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
322 5275 heap_.pop_back();
323
324
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 5275 times.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
✗ Branch 8 not taken.
✓ Branch 9 taken 5275 times.
5275 if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
325 up_heap(index);
326 else
327 5275 down_heap(index);
328 }
329 }
330
331 5392 void up_heap(std::size_t index)
332 {
333
2/2
✓ Branch 0 taken 5275 times.
✓ Branch 1 taken 5380 times.
10655 while (index > 0)
334 {
335 5275 std::size_t parent = (index - 1) / 2;
336
2/2
✓ Branch 4 taken 12 times.
✓ Branch 5 taken 5263 times.
5275 if (!(heap_[index].time_ < heap_[parent].time_))
337 12 break;
338 5263 swap_heap(index, parent);
339 5263 index = parent;
340 }
341 5392 }
342
343 5275 void down_heap(std::size_t index)
344 {
345 5275 std::size_t child = index * 2 + 1;
346
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 5273 times.
5275 while (child < heap_.size())
347 {
348 2 std::size_t min_child = (child + 1 == heap_.size() ||
349 heap_[child].time_ < heap_[child + 1].time_)
350
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 ? child : child + 1;
351
352
1/2
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
2 if (heap_[index].time_ < heap_[min_child].time_)
353 2 break;
354
355 swap_heap(index, min_child);
356 index = min_child;
357 child = index * 2 + 1;
358 }
359 5275 }
360
361 10538 void swap_heap(std::size_t i1, std::size_t i2)
362 {
363 10538 heap_entry tmp = heap_[i1];
364 10538 heap_[i1] = heap_[i2];
365 10538 heap_[i2] = tmp;
366 10538 heap_[i1].timer_->heap_index_ = i1;
367 10538 heap_[i2].timer_->heap_index_ = i2;
368 10538 }
369 };
370
371 //------------------------------------------------------------------------------
372
373 void
374 5389 timer_impl::
375 release()
376 {
377 5389 svc_->destroy_impl(*this);
378 5389 }
379
380 void
381 5364 timer_impl::
382 wait(
383 std::coroutine_handle<> h,
384 capy::executor_ref d,
385 std::stop_token token,
386 std::error_code* ec)
387 {
388 // Check if timer already expired (not in heap anymore)
389 5364 bool already_expired = (heap_index_ == (std::numeric_limits<std::size_t>::max)());
390
391
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 5358 times.
5364 if (already_expired)
392 {
393 // Timer already expired - dispatch immediately
394
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 if (ec)
395 6 *ec = {};
396 // Note: no work tracking needed - we dispatch synchronously
397 6 resume_coro(d, h);
398 6 return;
399 }
400
401 5358 h_ = h;
402 5358 d_ = std::move(d);
403 5358 token_ = std::move(token);
404 5358 ec_out_ = ec;
405 5358 waiting_ = true;
406 5358 svc_->get_scheduler().on_work_started();
407 }
408
409 //------------------------------------------------------------------------------
410 //
411 // Extern free functions called from timer.cpp
412 //
413 //------------------------------------------------------------------------------
414
415 timer::timer_impl*
416 5389 timer_service_create(capy::execution_context& ctx)
417 {
418 5389 auto* svc = ctx.find_service<timer_service>();
419
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5389 times.
5389 if (!svc)
420 {
421 // Timer service not yet created - this happens if io_context
422 // hasn't been constructed yet, or if the scheduler didn't
423 // initialize the timer service
424 throw std::runtime_error("timer_service not found");
425 }
426 5389 return svc->create_impl();
427 }
428
429 void
430 5389 timer_service_destroy(timer::timer_impl& base) noexcept
431 {
432 5389 static_cast<timer_impl&>(base).release();
433 5389 }
434
435 timer::time_point
436 28 timer_service_expiry(timer::timer_impl& base) noexcept
437 {
438 28 return static_cast<timer_impl&>(base).expiry_;
439 }
440
441 void
442 14 timer_service_expires_at(timer::timer_impl& base, timer::time_point t)
443 {
444 14 auto& impl = static_cast<timer_impl&>(base);
445 14 impl.expiry_ = t;
446 14 impl.svc_->update_timer(impl, t);
447 14 }
448
449 void
450 5378 timer_service_expires_after(timer::timer_impl& base, timer::duration d)
451 {
452 5378 auto& impl = static_cast<timer_impl&>(base);
453
1/1
✓ Branch 2 taken 5378 times.
5378 impl.expiry_ = timer::clock_type::now() + d;
454 5378 impl.svc_->update_timer(impl, impl.expiry_);
455 5378 }
456
457 void
458 14 timer_service_cancel(timer::timer_impl& base) noexcept
459 {
460 14 auto& impl = static_cast<timer_impl&>(base);
461 14 impl.svc_->cancel_timer(impl);
462 14 }
463
464 timer_service&
465 304 get_timer_service(capy::execution_context& ctx, scheduler& sched)
466 {
467 304 return ctx.make_service<timer_service_impl>(sched);
468 }
469
470 } // namespace boost::corosio::detail
471