libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp

76.1% Lines (325/427) 89.2% Functions (33/37) 64.6% Branches (170/263)
libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/scheduler.hpp"
15 #include "src/detail/epoll/op.hpp"
16 #include "src/detail/make_err.hpp"
17 #include "src/detail/posix/resolver_service.hpp"
18 #include "src/detail/posix/signals.hpp"
19
20 #include <boost/corosio/detail/except.hpp>
21 #include <boost/corosio/detail/thread_local_ptr.hpp>
22
23 #include <atomic>
24 #include <chrono>
25 #include <limits>
26
27 #include <errno.h>
28 #include <fcntl.h>
29 #include <sys/epoll.h>
30 #include <sys/eventfd.h>
31 #include <sys/socket.h>
32 #include <sys/timerfd.h>
33 #include <unistd.h>
34
35 /*
36 epoll Scheduler - Single Reactor Model
37 ======================================
38
39 This scheduler uses a thread coordination strategy to provide handler
40 parallelism and avoid the thundering herd problem.
41 Instead of all threads blocking on epoll_wait(), one thread becomes the
42 "reactor" while others wait on a condition variable for handler work.
43
44 Thread Model
45 ------------
46 - ONE thread runs epoll_wait() at a time (the reactor thread)
47 - OTHER threads wait on wakeup_event_ (condition variable) for handlers
48 - When work is posted, exactly one waiting thread wakes via notify_one()
49 - This matches Windows IOCP semantics where N posted items wake N threads
50
51 Event Loop Structure (do_one)
52 -----------------------------
53 1. Lock mutex, try to pop handler from queue
54 2. If got handler: execute it (unlocked), return
55 3. If queue empty and no reactor running: become reactor
56 - Run epoll_wait (unlocked), queue I/O completions, loop back
57 4. If queue empty and reactor running: wait on condvar for work
58
59 The reactor_running_ flag ensures only one thread owns epoll_wait().
60 After the reactor queues I/O completions, it loops back to try getting
61 a handler, giving priority to handler execution over more I/O polling.
62
63 Wake Coordination (wake_one_thread_and_unlock)
64 ----------------------------------------------
65 When posting work:
66 - If idle threads exist: notify_one() wakes exactly one worker
67 - Else if reactor running: interrupt via eventfd write
68 - Else: no-op (thread will find work when it checks queue)
69
70 This is critical for matching IOCP behavior. With the old model, posting
71 N handlers would wake all threads (thundering herd). Now each post()
72 wakes at most one thread, and that thread handles exactly one item.
73
74 Work Counting
75 -------------
76 outstanding_work_ tracks pending operations. When it hits zero, run()
77 returns. Each operation increments on start, decrements on completion.
78
79 Timer Integration
80 -----------------
81 Timers are handled by timer_service. The reactor adjusts epoll_wait
82 timeout to wake for the nearest timer expiry. When a new timer is
83 scheduled earlier than current, timer_service calls interrupt_reactor()
84 to re-evaluate the timeout.
85 */
86
87 namespace boost::corosio::detail {
88
89 namespace {
90
91 struct scheduler_context
92 {
93 epoll_scheduler const* key;
94 scheduler_context* next;
95 op_queue private_queue;
96 long private_outstanding_work;
97
98 155 scheduler_context(epoll_scheduler const* k, scheduler_context* n)
99 155 : key(k)
100 155 , next(n)
101 155 , private_outstanding_work(0)
102 {
103 155 }
104 };
105
106 corosio::detail::thread_local_ptr<scheduler_context> context_stack;
107
108 struct thread_context_guard
109 {
110 scheduler_context frame_;
111
112 155 explicit thread_context_guard(
113 epoll_scheduler const* ctx) noexcept
114 155 : frame_(ctx, context_stack.get())
115 {
116 155 context_stack.set(&frame_);
117 155 }
118
119 155 ~thread_context_guard() noexcept
120 {
121
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 155 times.
155 if (!frame_.private_queue.empty())
122 frame_.key->drain_thread_queue(frame_.private_queue, frame_.private_outstanding_work);
123 155 context_stack.set(frame_.next);
124 155 }
125 };
126
127 scheduler_context*
128 344147 find_context(epoll_scheduler const* self) noexcept
129 {
130
2/2
✓ Branch 1 taken 342498 times.
✓ Branch 2 taken 1649 times.
344147 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
131
1/2
✓ Branch 0 taken 342498 times.
✗ Branch 1 not taken.
342498 if (c->key == self)
132 342498 return c;
133 1649 return nullptr;
134 }
135
136 } // namespace
137
138 184 epoll_scheduler::
139 epoll_scheduler(
140 capy::execution_context& ctx,
141 184 int)
142 184 : epoll_fd_(-1)
143 184 , event_fd_(-1)
144 184 , timer_fd_(-1)
145 184 , outstanding_work_(0)
146 184 , stopped_(false)
147 184 , shutdown_(false)
148 184 , reactor_running_(false)
149 184 , reactor_interrupted_(false)
150 368 , idle_thread_count_(0)
151 {
152 184 epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
153
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 184 times.
184 if (epoll_fd_ < 0)
154 detail::throw_system_error(make_err(errno), "epoll_create1");
155
156 184 event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
157
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 184 times.
184 if (event_fd_ < 0)
158 {
159 int errn = errno;
160 ::close(epoll_fd_);
161 detail::throw_system_error(make_err(errn), "eventfd");
162 }
163
164 184 timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
165
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 184 times.
184 if (timer_fd_ < 0)
166 {
167 int errn = errno;
168 ::close(event_fd_);
169 ::close(epoll_fd_);
170 detail::throw_system_error(make_err(errn), "timerfd_create");
171 }
172
173 184 epoll_event ev{};
174 184 ev.events = EPOLLIN | EPOLLET;
175 184 ev.data.ptr = nullptr;
176
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 184 times.
184 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
177 {
178 int errn = errno;
179 ::close(timer_fd_);
180 ::close(event_fd_);
181 ::close(epoll_fd_);
182 detail::throw_system_error(make_err(errn), "epoll_ctl");
183 }
184
185 184 epoll_event timer_ev{};
186 184 timer_ev.events = EPOLLIN | EPOLLERR;
187 184 timer_ev.data.ptr = &timer_fd_;
188
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 184 times.
184 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
189 {
190 int errn = errno;
191 ::close(timer_fd_);
192 ::close(event_fd_);
193 ::close(epoll_fd_);
194 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
195 }
196
197
1/1
✓ Branch 1 taken 184 times.
184 timer_svc_ = &get_timer_service(ctx, *this);
198
1/1
✓ Branch 3 taken 184 times.
184 timer_svc_->set_on_earliest_changed(
199 timer_service::callback(
200 this,
201 2957 [](void* p) { static_cast<epoll_scheduler*>(p)->update_timerfd(); }));
202
203 // Initialize resolver service
204
1/1
✓ Branch 1 taken 184 times.
184 get_resolver_service(ctx, *this);
205
206 // Initialize signal service
207
1/1
✓ Branch 1 taken 184 times.
184 get_signal_service(ctx, *this);
208
209 // Push task sentinel to interleave reactor runs with handler execution
210 184 completed_ops_.push(&task_op_);
211 184 }
212
213 368 epoll_scheduler::
214 184 ~epoll_scheduler()
215 {
216
1/2
✓ Branch 0 taken 184 times.
✗ Branch 1 not taken.
184 if (timer_fd_ >= 0)
217 184 ::close(timer_fd_);
218
1/2
✓ Branch 0 taken 184 times.
✗ Branch 1 not taken.
184 if (event_fd_ >= 0)
219 184 ::close(event_fd_);
220
1/2
✓ Branch 0 taken 184 times.
✗ Branch 1 not taken.
184 if (epoll_fd_ >= 0)
221 184 ::close(epoll_fd_);
222 368 }
223
224 void
225 184 epoll_scheduler::
226 shutdown()
227 {
228 {
229
1/1
✓ Branch 1 taken 184 times.
184 std::unique_lock lock(mutex_);
230 184 shutdown_ = true;
231
232
2/2
✓ Branch 1 taken 184 times.
✓ Branch 2 taken 184 times.
368 while (auto* h = completed_ops_.pop())
233 {
234
1/2
✓ Branch 0 taken 184 times.
✗ Branch 1 not taken.
184 if (h == &task_op_)
235 184 continue;
236 lock.unlock();
237 h->destroy();
238 lock.lock();
239 184 }
240 184 }
241
242 184 outstanding_work_.store(0, std::memory_order_release);
243
244
1/2
✓ Branch 0 taken 184 times.
✗ Branch 1 not taken.
184 if (event_fd_ >= 0)
245 184 interrupt_reactor();
246
247 184 wakeup_event_.notify_all();
248 184 }
249
250 void
251 1639 epoll_scheduler::
252 post(capy::coro h) const
253 {
254 struct post_handler final
255 : scheduler_op
256 {
257 capy::coro h_;
258
259 explicit
260 1639 post_handler(capy::coro h)
261 1639 : h_(h)
262 {
263 1639 }
264
265 3278 ~post_handler() = default;
266
267 1639 void operator()() override
268 {
269 1639 auto h = h_;
270
1/2
✓ Branch 0 taken 1639 times.
✗ Branch 1 not taken.
1639 delete this;
271 std::atomic_thread_fence(std::memory_order_acquire);
272
1/1
✓ Branch 1 taken 1639 times.
1639 h.resume();
273 1639 }
274
275 void destroy() override
276 {
277 delete this;
278 }
279 };
280
281
1/1
✓ Branch 1 taken 1639 times.
1639 auto ph = std::make_unique<post_handler>(h);
282
283 // Fast path: same thread posts to private queue without locking
284
2/2
✓ Branch 1 taken 16 times.
✓ Branch 2 taken 1623 times.
1639 if (auto* ctx = find_context(this))
285 {
286 16 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
287 16 ++ctx->private_outstanding_work;
288 16 ctx->private_queue.push(ph.release());
289 16 return;
290 }
291
292 // Slow path: cross-thread post requires mutex
293 1623 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
294
295
1/1
✓ Branch 1 taken 1623 times.
1623 std::unique_lock lock(mutex_);
296 1623 completed_ops_.push(ph.release());
297
1/1
✓ Branch 1 taken 1623 times.
1623 wake_one_thread_and_unlock(lock);
298 1639 }
299
300 void
301 167366 epoll_scheduler::
302 post(scheduler_op* h) const
303 {
304 // Fast path: same thread posts to private queue without locking
305
2/2
✓ Branch 1 taken 167340 times.
✓ Branch 2 taken 26 times.
167366 if (auto* ctx = find_context(this))
306 {
307 167340 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
308 167340 ++ctx->private_outstanding_work;
309 167340 ctx->private_queue.push(h);
310 167340 return;
311 }
312
313 // Slow path: cross-thread post requires mutex
314 26 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
315
316
1/1
✓ Branch 1 taken 26 times.
26 std::unique_lock lock(mutex_);
317 26 completed_ops_.push(h);
318
1/1
✓ Branch 1 taken 26 times.
26 wake_one_thread_and_unlock(lock);
319 26 }
320
321 void
322 2980 epoll_scheduler::
323 on_work_started() noexcept
324 {
325 2980 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
326 2980 }
327
328 void
329 2948 epoll_scheduler::
330 on_work_finished() noexcept
331 {
332
2/2
✓ Branch 0 taken 18 times.
✓ Branch 1 taken 2930 times.
5896 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
333 18 stop();
334 2948 }
335
336 bool
337 3193 epoll_scheduler::
338 running_in_this_thread() const noexcept
339 {
340
2/2
✓ Branch 1 taken 2983 times.
✓ Branch 2 taken 210 times.
3193 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
341
1/2
✓ Branch 0 taken 2983 times.
✗ Branch 1 not taken.
2983 if (c->key == this)
342 2983 return true;
343 210 return false;
344 }
345
346 void
347 37 epoll_scheduler::
348 stop()
349 {
350 37 bool expected = false;
351
1/2
✓ Branch 1 taken 37 times.
✗ Branch 2 not taken.
37 if (stopped_.compare_exchange_strong(expected, true,
352 std::memory_order_release, std::memory_order_relaxed))
353 {
354 // Wake all threads so they notice stopped_ and exit
355 {
356
1/1
✓ Branch 1 taken 37 times.
37 std::lock_guard lock(mutex_);
357 37 wakeup_event_.notify_all();
358 37 }
359
1/1
✓ Branch 1 taken 37 times.
37 interrupt_reactor();
360 }
361 37 }
362
363 bool
364 16 epoll_scheduler::
365 stopped() const noexcept
366 {
367 16 return stopped_.load(std::memory_order_acquire);
368 }
369
370 void
371 49 epoll_scheduler::
372 restart()
373 {
374 49 stopped_.store(false, std::memory_order_release);
375 49 }
376
377 std::size_t
378 175 epoll_scheduler::
379 run()
380 {
381
2/2
✓ Branch 1 taken 20 times.
✓ Branch 2 taken 155 times.
175 if (stopped_.load(std::memory_order_acquire))
382 20 return 0;
383
384
2/2
✓ Branch 1 taken 11 times.
✓ Branch 2 taken 144 times.
310 if (outstanding_work_.load(std::memory_order_acquire) == 0)
385 {
386
1/1
✓ Branch 1 taken 11 times.
11 stop();
387 11 return 0;
388 }
389
390 144 thread_context_guard ctx(this);
391
392 144 std::size_t n = 0;
393
3/3
✓ Branch 1 taken 174572 times.
✓ Branch 3 taken 174428 times.
✓ Branch 4 taken 144 times.
174572 while (do_one(-1))
394
1/2
✓ Branch 1 taken 174428 times.
✗ Branch 2 not taken.
174428 if (n != (std::numeric_limits<std::size_t>::max)())
395 174428 ++n;
396 144 return n;
397 144 }
398
399 std::size_t
400 2 epoll_scheduler::
401 run_one()
402 {
403
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
2 if (stopped_.load(std::memory_order_acquire))
404 return 0;
405
406
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
407 {
408 stop();
409 return 0;
410 }
411
412 2 thread_context_guard ctx(this);
413
1/1
✓ Branch 1 taken 2 times.
2 return do_one(-1);
414 2 }
415
416 std::size_t
417 10 epoll_scheduler::
418 wait_one(long usec)
419 {
420
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 10 times.
10 if (stopped_.load(std::memory_order_acquire))
421 return 0;
422
423
2/2
✓ Branch 1 taken 4 times.
✓ Branch 2 taken 6 times.
20 if (outstanding_work_.load(std::memory_order_acquire) == 0)
424 {
425
1/1
✓ Branch 1 taken 4 times.
4 stop();
426 4 return 0;
427 }
428
429 6 thread_context_guard ctx(this);
430
1/1
✓ Branch 1 taken 6 times.
6 return do_one(usec);
431 6 }
432
433 std::size_t
434 2 epoll_scheduler::
435 poll()
436 {
437
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
2 if (stopped_.load(std::memory_order_acquire))
438 return 0;
439
440
2/2
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
441 {
442
1/1
✓ Branch 1 taken 1 time.
1 stop();
443 1 return 0;
444 }
445
446 1 thread_context_guard ctx(this);
447
448 1 std::size_t n = 0;
449
3/3
✓ Branch 1 taken 3 times.
✓ Branch 3 taken 2 times.
✓ Branch 4 taken 1 time.
3 while (do_one(0))
450
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (n != (std::numeric_limits<std::size_t>::max)())
451 2 ++n;
452 1 return n;
453 1 }
454
455 std::size_t
456 4 epoll_scheduler::
457 poll_one()
458 {
459
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (stopped_.load(std::memory_order_acquire))
460 return 0;
461
462
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
8 if (outstanding_work_.load(std::memory_order_acquire) == 0)
463 {
464
1/1
✓ Branch 1 taken 2 times.
2 stop();
465 2 return 0;
466 }
467
468 2 thread_context_guard ctx(this);
469
1/1
✓ Branch 1 taken 2 times.
2 return do_one(0);
470 2 }
471
472 void
473 5496 epoll_scheduler::
474 register_descriptor(int fd, descriptor_data* desc) const
475 {
476 5496 epoll_event ev{};
477 5496 ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
478 5496 ev.data.ptr = desc;
479
480
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5496 times.
5496 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
481 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
482
483 5496 desc->registered_events = ev.events;
484 5496 desc->is_registered = true;
485 5496 desc->fd = fd;
486 5496 desc->read_ready.store(false, std::memory_order_relaxed);
487 5496 desc->write_ready.store(false, std::memory_order_relaxed);
488 5496 }
489
490 void
491 epoll_scheduler::
492 update_descriptor_events(int, descriptor_data*, std::uint32_t) const
493 {
494 // Provides memory fence for operation pointer visibility across threads
495 std::atomic_thread_fence(std::memory_order_seq_cst);
496 }
497
498 void
499 5496 epoll_scheduler::
500 deregister_descriptor(int fd) const
501 {
502 5496 ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
503 5496 }
504
505 void
506 5604 epoll_scheduler::
507 work_started() const noexcept
508 {
509 5604 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
510 5604 }
511
512 void
513 174641 epoll_scheduler::
514 work_finished() const noexcept
515 {
516
2/2
✓ Branch 0 taken 130 times.
✓ Branch 1 taken 174511 times.
349282 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
517 {
518 // Last work item completed - wake all threads so they can exit.
519 // notify_all() wakes threads waiting on the condvar.
520 // interrupt_reactor() wakes the reactor thread blocked in epoll_wait().
521 // Both are needed because they target different blocking mechanisms.
522 130 std::unique_lock lock(mutex_);
523 130 wakeup_event_.notify_all();
524
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 130 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
130 if (reactor_running_ && !reactor_interrupted_)
525 {
526 reactor_interrupted_ = true;
527 lock.unlock();
528 interrupt_reactor();
529 }
530 130 }
531 174641 }
532
533 void
534 epoll_scheduler::
535 drain_thread_queue(op_queue& queue, long count) const
536 {
537 std::lock_guard lock(mutex_);
538 // Note: outstanding_work_ was already incremented when posting
539 completed_ops_.splice(queue);
540 if (count > 0)
541 wakeup_event_.notify_all();
542 }
543
544 void
545 247 epoll_scheduler::
546 interrupt_reactor() const
547 {
548 // Only write if not already armed to avoid redundant writes
549 247 bool expected = false;
550
2/2
✓ Branch 1 taken 216 times.
✓ Branch 2 taken 31 times.
247 if (eventfd_armed_.compare_exchange_strong(expected, true,
551 std::memory_order_release, std::memory_order_relaxed))
552 {
553 216 std::uint64_t val = 1;
554
1/1
✓ Branch 1 taken 216 times.
216 [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
555 }
556 247 }
557
558 void
559 1649 epoll_scheduler::
560 wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
561 {
562
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1649 times.
1649 if (idle_thread_count_ > 0)
563 {
564 wakeup_event_.notify_one();
565 lock.unlock();
566 }
567
3/4
✓ Branch 0 taken 26 times.
✓ Branch 1 taken 1623 times.
✓ Branch 2 taken 26 times.
✗ Branch 3 not taken.
1649 else if (reactor_running_ && !reactor_interrupted_)
568 {
569 26 reactor_interrupted_ = true;
570 26 lock.unlock();
571 26 interrupt_reactor();
572 }
573 else
574 {
575 1623 lock.unlock();
576 }
577 1649 }
578
579 struct work_guard
580 {
581 epoll_scheduler const* self;
582 ~work_guard() { self->work_finished(); }
583 };
584
585 void
586 5901 epoll_scheduler::
587 update_timerfd() const
588 {
589 5901 auto nearest = timer_svc_->nearest_expiry();
590
591 5901 itimerspec ts{};
592 5901 int flags = 0;
593
594
3/3
✓ Branch 2 taken 5901 times.
✓ Branch 4 taken 5861 times.
✓ Branch 5 taken 40 times.
5901 if (nearest == timer_service::time_point::max())
595 {
596 // No timers - disarm by setting to 0 (relative)
597 }
598 else
599 {
600 5861 auto now = std::chrono::steady_clock::now();
601
3/3
✓ Branch 1 taken 5861 times.
✓ Branch 4 taken 71 times.
✓ Branch 5 taken 5790 times.
5861 if (nearest <= now)
602 {
603 // Use 1ns instead of 0 - zero disarms the timerfd
604 71 ts.it_value.tv_nsec = 1;
605 }
606 else
607 {
608 5790 auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
609
1/1
✓ Branch 1 taken 5790 times.
11580 nearest - now).count();
610 5790 ts.it_value.tv_sec = nsec / 1000000000;
611 5790 ts.it_value.tv_nsec = nsec % 1000000000;
612 // Ensure non-zero to avoid disarming if duration rounds to 0
613
3/4
✓ Branch 0 taken 5780 times.
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 5780 times.
5790 if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
614 ts.it_value.tv_nsec = 1;
615 }
616 }
617
618
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5901 times.
5901 if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
619 detail::throw_system_error(make_err(errno), "timerfd_settime");
620 5901 }
621
622 void
623 87508 epoll_scheduler::
624 run_reactor(std::unique_lock<std::mutex>& lock)
625 {
626
2/2
✓ Branch 0 taken 81821 times.
✓ Branch 1 taken 5687 times.
87508 int timeout_ms = reactor_interrupted_ ? 0 : -1;
627
628
1/1
✓ Branch 1 taken 87508 times.
87508 lock.unlock();
629
630 // --- Event loop runs WITHOUT the mutex (like Asio) ---
631
632 epoll_event events[128];
633
1/1
✓ Branch 1 taken 87508 times.
87508 int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
634 87508 int saved_errno = errno;
635
636
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 87508 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
87508 if (nfds < 0 && saved_errno != EINTR)
637 detail::throw_system_error(make_err(saved_errno), "epoll_wait");
638
639 87508 bool check_timers = false;
640 87508 op_queue local_ops;
641 87508 int completions_queued = 0;
642
643 // Process events without holding the mutex
644
2/2
✓ Branch 0 taken 92088 times.
✓ Branch 1 taken 87508 times.
179596 for (int i = 0; i < nfds; ++i)
645 {
646
2/2
✓ Branch 0 taken 32 times.
✓ Branch 1 taken 92056 times.
92088 if (events[i].data.ptr == nullptr)
647 {
648 std::uint64_t val;
649
1/1
✓ Branch 1 taken 32 times.
32 [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
650 32 eventfd_armed_.store(false, std::memory_order_relaxed);
651 32 continue;
652 32 }
653
654
2/2
✓ Branch 0 taken 2944 times.
✓ Branch 1 taken 89112 times.
92056 if (events[i].data.ptr == &timer_fd_)
655 {
656 std::uint64_t expirations;
657
1/1
✓ Branch 1 taken 2944 times.
2944 [[maybe_unused]] auto r = ::read(timer_fd_, &expirations, sizeof(expirations));
658 2944 check_timers = true;
659 2944 continue;
660 2944 }
661
662 89112 auto* desc = static_cast<descriptor_data*>(events[i].data.ptr);
663 89112 std::uint32_t ev = events[i].events;
664 89112 int err = 0;
665
666
2/2
✓ Branch 0 taken 47 times.
✓ Branch 1 taken 89065 times.
89112 if (ev & (EPOLLERR | EPOLLHUP))
667 {
668 47 socklen_t len = sizeof(err);
669
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 47 times.
47 if (::getsockopt(desc->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
670 err = errno;
671
2/2
✓ Branch 0 taken 46 times.
✓ Branch 1 taken 1 time.
47 if (err == 0)
672 46 err = EIO;
673 }
674
675
2/2
✓ Branch 0 taken 37468 times.
✓ Branch 1 taken 51644 times.
89112 if (ev & EPOLLIN)
676 {
677 37468 auto* op = desc->read_op.exchange(nullptr, std::memory_order_acq_rel);
678
2/2
✓ Branch 0 taken 2762 times.
✓ Branch 1 taken 34706 times.
37468 if (op)
679 {
680
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2762 times.
2762 if (err)
681 {
682 op->complete(err, 0);
683 local_ops.push(op);
684 ++completions_queued;
685 }
686 else
687 {
688 2762 op->perform_io();
689
2/4
✓ Branch 0 taken 2762 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 2762 times.
2762 if (op->errn == EAGAIN || op->errn == EWOULDBLOCK)
690 {
691 op->errn = 0;
692 desc->read_op.store(op, std::memory_order_release);
693 }
694 else
695 {
696 2762 local_ops.push(op);
697 2762 ++completions_queued;
698 }
699 }
700 }
701 else
702 {
703 34706 desc->read_ready.store(true, std::memory_order_release);
704 }
705 }
706
707
2/2
✓ Branch 0 taken 86401 times.
✓ Branch 1 taken 2711 times.
89112 if (ev & EPOLLOUT)
708 {
709 86401 auto* conn_op = desc->connect_op.exchange(nullptr, std::memory_order_acq_rel);
710
2/2
✓ Branch 0 taken 2672 times.
✓ Branch 1 taken 83729 times.
86401 if (conn_op)
711 {
712
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 2671 times.
2672 if (err)
713 {
714 1 conn_op->complete(err, 0);
715 1 local_ops.push(conn_op);
716 1 ++completions_queued;
717 }
718 else
719 {
720 2671 conn_op->perform_io();
721
2/4
✓ Branch 0 taken 2671 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 2671 times.
2671 if (conn_op->errn == EAGAIN || conn_op->errn == EWOULDBLOCK)
722 {
723 conn_op->errn = 0;
724 desc->connect_op.store(conn_op, std::memory_order_release);
725 }
726 else
727 {
728 2671 local_ops.push(conn_op);
729 2671 ++completions_queued;
730 }
731 }
732 }
733
734 86401 auto* write_op = desc->write_op.exchange(nullptr, std::memory_order_acq_rel);
735
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 86401 times.
86401 if (write_op)
736 {
737 if (err)
738 {
739 write_op->complete(err, 0);
740 local_ops.push(write_op);
741 ++completions_queued;
742 }
743 else
744 {
745 write_op->perform_io();
746 if (write_op->errn == EAGAIN || write_op->errn == EWOULDBLOCK)
747 {
748 write_op->errn = 0;
749 desc->write_op.store(write_op, std::memory_order_release);
750 }
751 else
752 {
753 local_ops.push(write_op);
754 ++completions_queued;
755 }
756 }
757 }
758
759
3/4
✓ Branch 0 taken 83729 times.
✓ Branch 1 taken 2672 times.
✓ Branch 2 taken 83729 times.
✗ Branch 3 not taken.
86401 if (!conn_op && !write_op)
760 83729 desc->write_ready.store(true, std::memory_order_release);
761 }
762
763
3/4
✓ Branch 0 taken 47 times.
✓ Branch 1 taken 89065 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 47 times.
89112 if (err && !(ev & (EPOLLIN | EPOLLOUT)))
764 {
765 auto* read_op = desc->read_op.exchange(nullptr, std::memory_order_acq_rel);
766 if (read_op)
767 {
768 read_op->complete(err, 0);
769 local_ops.push(read_op);
770 ++completions_queued;
771 }
772
773 auto* write_op = desc->write_op.exchange(nullptr, std::memory_order_acq_rel);
774 if (write_op)
775 {
776 write_op->complete(err, 0);
777 local_ops.push(write_op);
778 ++completions_queued;
779 }
780
781 auto* conn_op = desc->connect_op.exchange(nullptr, std::memory_order_acq_rel);
782 if (conn_op)
783 {
784 conn_op->complete(err, 0);
785 local_ops.push(conn_op);
786 ++completions_queued;
787 }
788 }
789 }
790
791 // Process timers only when timerfd fires (like Asio's check_timers pattern)
792
2/2
✓ Branch 0 taken 2944 times.
✓ Branch 1 taken 84564 times.
87508 if (check_timers)
793 {
794
1/1
✓ Branch 1 taken 2944 times.
2944 timer_svc_->process_expired();
795
1/1
✓ Branch 1 taken 2944 times.
2944 update_timerfd();
796 }
797
798 // --- Acquire mutex only for queue operations ---
799
1/1
✓ Branch 1 taken 87508 times.
87508 lock.lock();
800
801
2/2
✓ Branch 1 taken 2763 times.
✓ Branch 2 taken 84745 times.
87508 if (!local_ops.empty())
802 2763 completed_ops_.splice(local_ops);
803
804 // Drain private queue (outstanding_work_ was already incremented when posting)
805
1/2
✓ Branch 1 taken 87508 times.
✗ Branch 2 not taken.
87508 if (auto* ctx = find_context(this))
806 {
807
2/2
✓ Branch 1 taken 81888 times.
✓ Branch 2 taken 5620 times.
87508 if (!ctx->private_queue.empty())
808 {
809 81888 completions_queued += ctx->private_outstanding_work;
810 81888 ctx->private_outstanding_work = 0;
811 81888 completed_ops_.splice(ctx->private_queue);
812 }
813 }
814
815 // Only wake threads that are actually idle, and only as many as we have work
816
3/4
✓ Branch 0 taken 84608 times.
✓ Branch 1 taken 2900 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 84608 times.
87508 if (completions_queued > 0 && idle_thread_count_ > 0)
817 {
818 int threads_to_wake = (std::min)(completions_queued, idle_thread_count_);
819 for (int i = 0; i < threads_to_wake; ++i)
820 wakeup_event_.notify_one();
821 }
822 87508 }
823
824 std::size_t
825 174585 epoll_scheduler::
826 do_one(long timeout_us)
827 {
828
1/1
✓ Branch 1 taken 174585 times.
174585 std::unique_lock lock(mutex_);
829
830 for (;;)
831 {
832
2/2
✓ Branch 1 taken 20 times.
✓ Branch 2 taken 262073 times.
262093 if (stopped_.load(std::memory_order_acquire))
833 20 return 0;
834
835 262073 scheduler_op* op = completed_ops_.pop();
836
837
2/2
✓ Branch 0 taken 87634 times.
✓ Branch 1 taken 174439 times.
262073 if (op == &task_op_)
838 {
839 // Check both global queue and private queue for pending handlers
840 87634 auto* ctx = find_context(this);
841
3/4
✓ Branch 1 taken 87486 times.
✓ Branch 2 taken 148 times.
✓ Branch 3 taken 87486 times.
✗ Branch 4 not taken.
175120 bool more_handlers = !completed_ops_.empty() ||
842
2/2
✓ Branch 1 taken 81673 times.
✓ Branch 2 taken 5813 times.
87486 (ctx && !ctx->private_queue.empty());
843
844
2/2
✓ Branch 0 taken 5813 times.
✓ Branch 1 taken 81821 times.
87634 if (!more_handlers)
845 {
846
2/2
✓ Branch 1 taken 126 times.
✓ Branch 2 taken 5687 times.
11626 if (outstanding_work_.load(std::memory_order_acquire) == 0)
847 {
848 126 completed_ops_.push(&task_op_);
849 126 return 0;
850 }
851
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5687 times.
5687 if (timeout_us == 0)
852 {
853 completed_ops_.push(&task_op_);
854 return 0;
855 }
856 }
857
858
3/4
✓ Branch 0 taken 5687 times.
✓ Branch 1 taken 81821 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 5687 times.
87508 reactor_interrupted_ = more_handlers || timeout_us == 0;
859 87508 reactor_running_ = true;
860
861
3/4
✓ Branch 0 taken 81821 times.
✓ Branch 1 taken 5687 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 81821 times.
87508 if (more_handlers && idle_thread_count_ > 0)
862 wakeup_event_.notify_one();
863
864
1/1
✓ Branch 1 taken 87508 times.
87508 run_reactor(lock);
865
866 87508 reactor_running_ = false;
867 87508 completed_ops_.push(&task_op_);
868 87508 continue;
869 87508 }
870
871
1/2
✓ Branch 0 taken 174439 times.
✗ Branch 1 not taken.
174439 if (op != nullptr)
872 {
873
1/1
✓ Branch 1 taken 174439 times.
174439 lock.unlock();
874 174439 work_guard g{this};
875
1/1
✓ Branch 1 taken 174439 times.
174439 (*op)();
876 174439 return 1;
877 174439 }
878
879 if (outstanding_work_.load(std::memory_order_acquire) == 0)
880 return 0;
881
882 if (timeout_us == 0)
883 return 0;
884
885 // Drain private queue before blocking (outstanding_work_ was already incremented)
886 if (auto* ctx = find_context(this))
887 {
888 if (!ctx->private_queue.empty())
889 {
890 ctx->private_outstanding_work = 0;
891 completed_ops_.splice(ctx->private_queue);
892 continue;
893 }
894 }
895
896 ++idle_thread_count_;
897 if (timeout_us < 0)
898 wakeup_event_.wait(lock);
899 else
900 wakeup_event_.wait_for(lock, std::chrono::microseconds(timeout_us));
901 --idle_thread_count_;
902 87508 }
903 174585 }
904
905 } // namespace boost::corosio::detail
906
907 #endif
908