Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #include <boost/corosio/detail/platform.hpp>
11 :
12 : #if BOOST_COROSIO_HAS_EPOLL
13 :
14 : #include "src/detail/epoll/scheduler.hpp"
15 : #include "src/detail/epoll/op.hpp"
16 : #include "src/detail/make_err.hpp"
17 : #include "src/detail/posix/resolver_service.hpp"
18 : #include "src/detail/posix/signals.hpp"
19 :
20 : #include <boost/corosio/detail/except.hpp>
21 : #include <boost/corosio/detail/thread_local_ptr.hpp>
22 :
23 : #include <atomic>
24 : #include <chrono>
25 : #include <limits>
26 :
27 : #include <errno.h>
28 : #include <fcntl.h>
29 : #include <sys/epoll.h>
30 : #include <sys/eventfd.h>
31 : #include <sys/socket.h>
32 : #include <sys/timerfd.h>
33 : #include <unistd.h>
34 :
35 : /*
36 : epoll Scheduler - Single Reactor Model
37 : ======================================
38 :
39 : This scheduler uses a thread coordination strategy to provide handler
40 : parallelism and avoid the thundering herd problem.
41 : Instead of all threads blocking on epoll_wait(), one thread becomes the
42 : "reactor" while others wait on a condition variable for handler work.
43 :
44 : Thread Model
45 : ------------
46 : - ONE thread runs epoll_wait() at a time (the reactor thread)
47 : - OTHER threads wait on wakeup_event_ (condition variable) for handlers
48 : - When work is posted, exactly one waiting thread wakes via notify_one()
49 : - This matches Windows IOCP semantics where N posted items wake N threads
50 :
51 : Event Loop Structure (do_one)
52 : -----------------------------
53 : 1. Lock mutex, try to pop handler from queue
54 : 2. If got handler: execute it (unlocked), return
55 : 3. If queue empty and no reactor running: become reactor
56 : - Run epoll_wait (unlocked), queue I/O completions, loop back
57 : 4. If queue empty and reactor running: wait on condvar for work
58 :
59 : The reactor_running_ flag ensures only one thread owns epoll_wait().
60 : After the reactor queues I/O completions, it loops back to try getting
61 : a handler, giving priority to handler execution over more I/O polling.
62 :
63 : Wake Coordination (wake_one_thread_and_unlock)
64 : ----------------------------------------------
65 : When posting work:
66 : - If idle threads exist: notify_one() wakes exactly one worker
67 : - Else if reactor running: interrupt via eventfd write
68 : - Else: no-op (thread will find work when it checks queue)
69 :
70 : This is critical for matching IOCP behavior. With the old model, posting
71 : N handlers would wake all threads (thundering herd). Now each post()
72 : wakes at most one thread, and that thread handles exactly one item.
73 :
74 : Work Counting
75 : -------------
76 : outstanding_work_ tracks pending operations. When it hits zero, run()
77 : returns. Each operation increments on start, decrements on completion.
78 :
79 : Timer Integration
80 : -----------------
81 : Timers are handled by timer_service. The reactor adjusts epoll_wait
82 : timeout to wake for the nearest timer expiry. When a new timer is
83 : scheduled earlier than current, timer_service calls interrupt_reactor()
84 : to re-evaluate the timeout.
85 : */
86 :
87 : namespace boost::corosio::detail {
88 :
89 : namespace {
90 :
91 : struct scheduler_context
92 : {
93 : epoll_scheduler const* key;
94 : scheduler_context* next;
95 : op_queue private_queue;
96 : long private_outstanding_work;
97 :
98 155 : scheduler_context(epoll_scheduler const* k, scheduler_context* n)
99 155 : : key(k)
100 155 : , next(n)
101 155 : , private_outstanding_work(0)
102 : {
103 155 : }
104 : };
105 :
106 : corosio::detail::thread_local_ptr<scheduler_context> context_stack;
107 :
108 : struct thread_context_guard
109 : {
110 : scheduler_context frame_;
111 :
112 155 : explicit thread_context_guard(
113 : epoll_scheduler const* ctx) noexcept
114 155 : : frame_(ctx, context_stack.get())
115 : {
116 155 : context_stack.set(&frame_);
117 155 : }
118 :
119 155 : ~thread_context_guard() noexcept
120 : {
121 155 : if (!frame_.private_queue.empty())
122 0 : frame_.key->drain_thread_queue(frame_.private_queue, frame_.private_outstanding_work);
123 155 : context_stack.set(frame_.next);
124 155 : }
125 : };
126 :
127 : scheduler_context*
128 344147 : find_context(epoll_scheduler const* self) noexcept
129 : {
130 344147 : for (auto* c = context_stack.get(); c != nullptr; c = c->next)
131 342498 : if (c->key == self)
132 342498 : return c;
133 1649 : return nullptr;
134 : }
135 :
136 : } // namespace
137 :
138 184 : epoll_scheduler::
139 : epoll_scheduler(
140 : capy::execution_context& ctx,
141 184 : int)
142 184 : : epoll_fd_(-1)
143 184 : , event_fd_(-1)
144 184 : , timer_fd_(-1)
145 184 : , outstanding_work_(0)
146 184 : , stopped_(false)
147 184 : , shutdown_(false)
148 184 : , reactor_running_(false)
149 184 : , reactor_interrupted_(false)
150 368 : , idle_thread_count_(0)
151 : {
152 184 : epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
153 184 : if (epoll_fd_ < 0)
154 0 : detail::throw_system_error(make_err(errno), "epoll_create1");
155 :
156 184 : event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
157 184 : if (event_fd_ < 0)
158 : {
159 0 : int errn = errno;
160 0 : ::close(epoll_fd_);
161 0 : detail::throw_system_error(make_err(errn), "eventfd");
162 : }
163 :
164 184 : timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
165 184 : if (timer_fd_ < 0)
166 : {
167 0 : int errn = errno;
168 0 : ::close(event_fd_);
169 0 : ::close(epoll_fd_);
170 0 : detail::throw_system_error(make_err(errn), "timerfd_create");
171 : }
172 :
173 184 : epoll_event ev{};
174 184 : ev.events = EPOLLIN | EPOLLET;
175 184 : ev.data.ptr = nullptr;
176 184 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
177 : {
178 0 : int errn = errno;
179 0 : ::close(timer_fd_);
180 0 : ::close(event_fd_);
181 0 : ::close(epoll_fd_);
182 0 : detail::throw_system_error(make_err(errn), "epoll_ctl");
183 : }
184 :
185 184 : epoll_event timer_ev{};
186 184 : timer_ev.events = EPOLLIN | EPOLLERR;
187 184 : timer_ev.data.ptr = &timer_fd_;
188 184 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
189 : {
190 0 : int errn = errno;
191 0 : ::close(timer_fd_);
192 0 : ::close(event_fd_);
193 0 : ::close(epoll_fd_);
194 0 : detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
195 : }
196 :
197 184 : timer_svc_ = &get_timer_service(ctx, *this);
198 184 : timer_svc_->set_on_earliest_changed(
199 : timer_service::callback(
200 : this,
201 2957 : [](void* p) { static_cast<epoll_scheduler*>(p)->update_timerfd(); }));
202 :
203 : // Initialize resolver service
204 184 : get_resolver_service(ctx, *this);
205 :
206 : // Initialize signal service
207 184 : get_signal_service(ctx, *this);
208 :
209 : // Push task sentinel to interleave reactor runs with handler execution
210 184 : completed_ops_.push(&task_op_);
211 184 : }
212 :
213 368 : epoll_scheduler::
214 184 : ~epoll_scheduler()
215 : {
216 184 : if (timer_fd_ >= 0)
217 184 : ::close(timer_fd_);
218 184 : if (event_fd_ >= 0)
219 184 : ::close(event_fd_);
220 184 : if (epoll_fd_ >= 0)
221 184 : ::close(epoll_fd_);
222 368 : }
223 :
224 : void
225 184 : epoll_scheduler::
226 : shutdown()
227 : {
228 : {
229 184 : std::unique_lock lock(mutex_);
230 184 : shutdown_ = true;
231 :
232 368 : while (auto* h = completed_ops_.pop())
233 : {
234 184 : if (h == &task_op_)
235 184 : continue;
236 0 : lock.unlock();
237 0 : h->destroy();
238 0 : lock.lock();
239 184 : }
240 184 : }
241 :
242 184 : outstanding_work_.store(0, std::memory_order_release);
243 :
244 184 : if (event_fd_ >= 0)
245 184 : interrupt_reactor();
246 :
247 184 : wakeup_event_.notify_all();
248 184 : }
249 :
250 : void
251 1639 : epoll_scheduler::
252 : post(capy::coro h) const
253 : {
254 : struct post_handler final
255 : : scheduler_op
256 : {
257 : capy::coro h_;
258 :
259 : explicit
260 1639 : post_handler(capy::coro h)
261 1639 : : h_(h)
262 : {
263 1639 : }
264 :
265 3278 : ~post_handler() = default;
266 :
267 1639 : void operator()() override
268 : {
269 1639 : auto h = h_;
270 1639 : delete this;
271 : std::atomic_thread_fence(std::memory_order_acquire);
272 1639 : h.resume();
273 1639 : }
274 :
275 0 : void destroy() override
276 : {
277 0 : delete this;
278 0 : }
279 : };
280 :
281 1639 : auto ph = std::make_unique<post_handler>(h);
282 :
283 : // Fast path: same thread posts to private queue without locking
284 1639 : if (auto* ctx = find_context(this))
285 : {
286 16 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
287 16 : ++ctx->private_outstanding_work;
288 16 : ctx->private_queue.push(ph.release());
289 16 : return;
290 : }
291 :
292 : // Slow path: cross-thread post requires mutex
293 1623 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
294 :
295 1623 : std::unique_lock lock(mutex_);
296 1623 : completed_ops_.push(ph.release());
297 1623 : wake_one_thread_and_unlock(lock);
298 1639 : }
299 :
300 : void
301 167366 : epoll_scheduler::
302 : post(scheduler_op* h) const
303 : {
304 : // Fast path: same thread posts to private queue without locking
305 167366 : if (auto* ctx = find_context(this))
306 : {
307 167340 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
308 167340 : ++ctx->private_outstanding_work;
309 167340 : ctx->private_queue.push(h);
310 167340 : return;
311 : }
312 :
313 : // Slow path: cross-thread post requires mutex
314 26 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
315 :
316 26 : std::unique_lock lock(mutex_);
317 26 : completed_ops_.push(h);
318 26 : wake_one_thread_and_unlock(lock);
319 26 : }
320 :
321 : void
322 2980 : epoll_scheduler::
323 : on_work_started() noexcept
324 : {
325 2980 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
326 2980 : }
327 :
328 : void
329 2948 : epoll_scheduler::
330 : on_work_finished() noexcept
331 : {
332 5896 : if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
333 18 : stop();
334 2948 : }
335 :
336 : bool
337 3193 : epoll_scheduler::
338 : running_in_this_thread() const noexcept
339 : {
340 3193 : for (auto* c = context_stack.get(); c != nullptr; c = c->next)
341 2983 : if (c->key == this)
342 2983 : return true;
343 210 : return false;
344 : }
345 :
346 : void
347 37 : epoll_scheduler::
348 : stop()
349 : {
350 37 : bool expected = false;
351 37 : if (stopped_.compare_exchange_strong(expected, true,
352 : std::memory_order_release, std::memory_order_relaxed))
353 : {
354 : // Wake all threads so they notice stopped_ and exit
355 : {
356 37 : std::lock_guard lock(mutex_);
357 37 : wakeup_event_.notify_all();
358 37 : }
359 37 : interrupt_reactor();
360 : }
361 37 : }
362 :
363 : bool
364 16 : epoll_scheduler::
365 : stopped() const noexcept
366 : {
367 16 : return stopped_.load(std::memory_order_acquire);
368 : }
369 :
370 : void
371 49 : epoll_scheduler::
372 : restart()
373 : {
374 49 : stopped_.store(false, std::memory_order_release);
375 49 : }
376 :
377 : std::size_t
378 175 : epoll_scheduler::
379 : run()
380 : {
381 175 : if (stopped_.load(std::memory_order_acquire))
382 20 : return 0;
383 :
384 310 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
385 : {
386 11 : stop();
387 11 : return 0;
388 : }
389 :
390 144 : thread_context_guard ctx(this);
391 :
392 144 : std::size_t n = 0;
393 174572 : while (do_one(-1))
394 174428 : if (n != (std::numeric_limits<std::size_t>::max)())
395 174428 : ++n;
396 144 : return n;
397 144 : }
398 :
399 : std::size_t
400 2 : epoll_scheduler::
401 : run_one()
402 : {
403 2 : if (stopped_.load(std::memory_order_acquire))
404 0 : return 0;
405 :
406 4 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
407 : {
408 0 : stop();
409 0 : return 0;
410 : }
411 :
412 2 : thread_context_guard ctx(this);
413 2 : return do_one(-1);
414 2 : }
415 :
416 : std::size_t
417 10 : epoll_scheduler::
418 : wait_one(long usec)
419 : {
420 10 : if (stopped_.load(std::memory_order_acquire))
421 0 : return 0;
422 :
423 20 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
424 : {
425 4 : stop();
426 4 : return 0;
427 : }
428 :
429 6 : thread_context_guard ctx(this);
430 6 : return do_one(usec);
431 6 : }
432 :
433 : std::size_t
434 2 : epoll_scheduler::
435 : poll()
436 : {
437 2 : if (stopped_.load(std::memory_order_acquire))
438 0 : return 0;
439 :
440 4 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
441 : {
442 1 : stop();
443 1 : return 0;
444 : }
445 :
446 1 : thread_context_guard ctx(this);
447 :
448 1 : std::size_t n = 0;
449 3 : while (do_one(0))
450 2 : if (n != (std::numeric_limits<std::size_t>::max)())
451 2 : ++n;
452 1 : return n;
453 1 : }
454 :
455 : std::size_t
456 4 : epoll_scheduler::
457 : poll_one()
458 : {
459 4 : if (stopped_.load(std::memory_order_acquire))
460 0 : return 0;
461 :
462 8 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
463 : {
464 2 : stop();
465 2 : return 0;
466 : }
467 :
468 2 : thread_context_guard ctx(this);
469 2 : return do_one(0);
470 2 : }
471 :
472 : void
473 5496 : epoll_scheduler::
474 : register_descriptor(int fd, descriptor_data* desc) const
475 : {
476 5496 : epoll_event ev{};
477 5496 : ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
478 5496 : ev.data.ptr = desc;
479 :
480 5496 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
481 0 : detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
482 :
483 5496 : desc->registered_events = ev.events;
484 5496 : desc->is_registered = true;
485 5496 : desc->fd = fd;
486 5496 : desc->read_ready.store(false, std::memory_order_relaxed);
487 5496 : desc->write_ready.store(false, std::memory_order_relaxed);
488 5496 : }
489 :
490 : void
491 0 : epoll_scheduler::
492 : update_descriptor_events(int, descriptor_data*, std::uint32_t) const
493 : {
494 : // Provides memory fence for operation pointer visibility across threads
495 : std::atomic_thread_fence(std::memory_order_seq_cst);
496 0 : }
497 :
498 : void
499 5496 : epoll_scheduler::
500 : deregister_descriptor(int fd) const
501 : {
502 5496 : ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
503 5496 : }
504 :
505 : void
506 5604 : epoll_scheduler::
507 : work_started() const noexcept
508 : {
509 5604 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
510 5604 : }
511 :
512 : void
513 174641 : epoll_scheduler::
514 : work_finished() const noexcept
515 : {
516 349282 : if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
517 : {
518 : // Last work item completed - wake all threads so they can exit.
519 : // notify_all() wakes threads waiting on the condvar.
520 : // interrupt_reactor() wakes the reactor thread blocked in epoll_wait().
521 : // Both are needed because they target different blocking mechanisms.
522 130 : std::unique_lock lock(mutex_);
523 130 : wakeup_event_.notify_all();
524 130 : if (reactor_running_ && !reactor_interrupted_)
525 : {
526 0 : reactor_interrupted_ = true;
527 0 : lock.unlock();
528 0 : interrupt_reactor();
529 : }
530 130 : }
531 174641 : }
532 :
533 : void
534 0 : epoll_scheduler::
535 : drain_thread_queue(op_queue& queue, long count) const
536 : {
537 0 : std::lock_guard lock(mutex_);
538 : // Note: outstanding_work_ was already incremented when posting
539 0 : completed_ops_.splice(queue);
540 0 : if (count > 0)
541 0 : wakeup_event_.notify_all();
542 0 : }
543 :
544 : void
545 247 : epoll_scheduler::
546 : interrupt_reactor() const
547 : {
548 : // Only write if not already armed to avoid redundant writes
549 247 : bool expected = false;
550 247 : if (eventfd_armed_.compare_exchange_strong(expected, true,
551 : std::memory_order_release, std::memory_order_relaxed))
552 : {
553 216 : std::uint64_t val = 1;
554 216 : [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
555 : }
556 247 : }
557 :
558 : void
559 1649 : epoll_scheduler::
560 : wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
561 : {
562 1649 : if (idle_thread_count_ > 0)
563 : {
564 0 : wakeup_event_.notify_one();
565 0 : lock.unlock();
566 : }
567 1649 : else if (reactor_running_ && !reactor_interrupted_)
568 : {
569 26 : reactor_interrupted_ = true;
570 26 : lock.unlock();
571 26 : interrupt_reactor();
572 : }
573 : else
574 : {
575 1623 : lock.unlock();
576 : }
577 1649 : }
578 :
579 : struct work_guard
580 : {
581 : epoll_scheduler const* self;
582 0 : ~work_guard() { self->work_finished(); }
583 : };
584 :
585 : void
586 5901 : epoll_scheduler::
587 : update_timerfd() const
588 : {
589 5901 : auto nearest = timer_svc_->nearest_expiry();
590 :
591 5901 : itimerspec ts{};
592 5901 : int flags = 0;
593 :
594 5901 : if (nearest == timer_service::time_point::max())
595 : {
596 : // No timers - disarm by setting to 0 (relative)
597 : }
598 : else
599 : {
600 5861 : auto now = std::chrono::steady_clock::now();
601 5861 : if (nearest <= now)
602 : {
603 : // Use 1ns instead of 0 - zero disarms the timerfd
604 71 : ts.it_value.tv_nsec = 1;
605 : }
606 : else
607 : {
608 5790 : auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
609 11580 : nearest - now).count();
610 5790 : ts.it_value.tv_sec = nsec / 1000000000;
611 5790 : ts.it_value.tv_nsec = nsec % 1000000000;
612 : // Ensure non-zero to avoid disarming if duration rounds to 0
613 5790 : if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
614 0 : ts.it_value.tv_nsec = 1;
615 : }
616 : }
617 :
618 5901 : if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
619 0 : detail::throw_system_error(make_err(errno), "timerfd_settime");
620 5901 : }
621 :
622 : void
623 87508 : epoll_scheduler::
624 : run_reactor(std::unique_lock<std::mutex>& lock)
625 : {
626 87508 : int timeout_ms = reactor_interrupted_ ? 0 : -1;
627 :
628 87508 : lock.unlock();
629 :
630 : // --- Event loop runs WITHOUT the mutex (like Asio) ---
631 :
632 : epoll_event events[128];
633 87508 : int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
634 87508 : int saved_errno = errno;
635 :
636 87508 : if (nfds < 0 && saved_errno != EINTR)
637 0 : detail::throw_system_error(make_err(saved_errno), "epoll_wait");
638 :
639 87508 : bool check_timers = false;
640 87508 : op_queue local_ops;
641 87508 : int completions_queued = 0;
642 :
643 : // Process events without holding the mutex
644 179596 : for (int i = 0; i < nfds; ++i)
645 : {
646 92088 : if (events[i].data.ptr == nullptr)
647 : {
648 : std::uint64_t val;
649 32 : [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
650 32 : eventfd_armed_.store(false, std::memory_order_relaxed);
651 32 : continue;
652 32 : }
653 :
654 92056 : if (events[i].data.ptr == &timer_fd_)
655 : {
656 : std::uint64_t expirations;
657 2944 : [[maybe_unused]] auto r = ::read(timer_fd_, &expirations, sizeof(expirations));
658 2944 : check_timers = true;
659 2944 : continue;
660 2944 : }
661 :
662 89112 : auto* desc = static_cast<descriptor_data*>(events[i].data.ptr);
663 89112 : std::uint32_t ev = events[i].events;
664 89112 : int err = 0;
665 :
666 89112 : if (ev & (EPOLLERR | EPOLLHUP))
667 : {
668 47 : socklen_t len = sizeof(err);
669 47 : if (::getsockopt(desc->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
670 0 : err = errno;
671 47 : if (err == 0)
672 46 : err = EIO;
673 : }
674 :
675 89112 : if (ev & EPOLLIN)
676 : {
677 37468 : auto* op = desc->read_op.exchange(nullptr, std::memory_order_acq_rel);
678 37468 : if (op)
679 : {
680 2762 : if (err)
681 : {
682 0 : op->complete(err, 0);
683 0 : local_ops.push(op);
684 0 : ++completions_queued;
685 : }
686 : else
687 : {
688 2762 : op->perform_io();
689 2762 : if (op->errn == EAGAIN || op->errn == EWOULDBLOCK)
690 : {
691 0 : op->errn = 0;
692 0 : desc->read_op.store(op, std::memory_order_release);
693 : }
694 : else
695 : {
696 2762 : local_ops.push(op);
697 2762 : ++completions_queued;
698 : }
699 : }
700 : }
701 : else
702 : {
703 34706 : desc->read_ready.store(true, std::memory_order_release);
704 : }
705 : }
706 :
707 89112 : if (ev & EPOLLOUT)
708 : {
709 86401 : auto* conn_op = desc->connect_op.exchange(nullptr, std::memory_order_acq_rel);
710 86401 : if (conn_op)
711 : {
712 2672 : if (err)
713 : {
714 1 : conn_op->complete(err, 0);
715 1 : local_ops.push(conn_op);
716 1 : ++completions_queued;
717 : }
718 : else
719 : {
720 2671 : conn_op->perform_io();
721 2671 : if (conn_op->errn == EAGAIN || conn_op->errn == EWOULDBLOCK)
722 : {
723 0 : conn_op->errn = 0;
724 0 : desc->connect_op.store(conn_op, std::memory_order_release);
725 : }
726 : else
727 : {
728 2671 : local_ops.push(conn_op);
729 2671 : ++completions_queued;
730 : }
731 : }
732 : }
733 :
734 86401 : auto* write_op = desc->write_op.exchange(nullptr, std::memory_order_acq_rel);
735 86401 : if (write_op)
736 : {
737 0 : if (err)
738 : {
739 0 : write_op->complete(err, 0);
740 0 : local_ops.push(write_op);
741 0 : ++completions_queued;
742 : }
743 : else
744 : {
745 0 : write_op->perform_io();
746 0 : if (write_op->errn == EAGAIN || write_op->errn == EWOULDBLOCK)
747 : {
748 0 : write_op->errn = 0;
749 0 : desc->write_op.store(write_op, std::memory_order_release);
750 : }
751 : else
752 : {
753 0 : local_ops.push(write_op);
754 0 : ++completions_queued;
755 : }
756 : }
757 : }
758 :
759 86401 : if (!conn_op && !write_op)
760 83729 : desc->write_ready.store(true, std::memory_order_release);
761 : }
762 :
763 89112 : if (err && !(ev & (EPOLLIN | EPOLLOUT)))
764 : {
765 0 : auto* read_op = desc->read_op.exchange(nullptr, std::memory_order_acq_rel);
766 0 : if (read_op)
767 : {
768 0 : read_op->complete(err, 0);
769 0 : local_ops.push(read_op);
770 0 : ++completions_queued;
771 : }
772 :
773 0 : auto* write_op = desc->write_op.exchange(nullptr, std::memory_order_acq_rel);
774 0 : if (write_op)
775 : {
776 0 : write_op->complete(err, 0);
777 0 : local_ops.push(write_op);
778 0 : ++completions_queued;
779 : }
780 :
781 0 : auto* conn_op = desc->connect_op.exchange(nullptr, std::memory_order_acq_rel);
782 0 : if (conn_op)
783 : {
784 0 : conn_op->complete(err, 0);
785 0 : local_ops.push(conn_op);
786 0 : ++completions_queued;
787 : }
788 : }
789 : }
790 :
791 : // Process timers only when timerfd fires (like Asio's check_timers pattern)
792 87508 : if (check_timers)
793 : {
794 2944 : timer_svc_->process_expired();
795 2944 : update_timerfd();
796 : }
797 :
798 : // --- Acquire mutex only for queue operations ---
799 87508 : lock.lock();
800 :
801 87508 : if (!local_ops.empty())
802 2763 : completed_ops_.splice(local_ops);
803 :
804 : // Drain private queue (outstanding_work_ was already incremented when posting)
805 87508 : if (auto* ctx = find_context(this))
806 : {
807 87508 : if (!ctx->private_queue.empty())
808 : {
809 81888 : completions_queued += ctx->private_outstanding_work;
810 81888 : ctx->private_outstanding_work = 0;
811 81888 : completed_ops_.splice(ctx->private_queue);
812 : }
813 : }
814 :
815 : // Only wake threads that are actually idle, and only as many as we have work
816 87508 : if (completions_queued > 0 && idle_thread_count_ > 0)
817 : {
818 0 : int threads_to_wake = (std::min)(completions_queued, idle_thread_count_);
819 0 : for (int i = 0; i < threads_to_wake; ++i)
820 0 : wakeup_event_.notify_one();
821 : }
822 87508 : }
823 :
824 : std::size_t
825 174585 : epoll_scheduler::
826 : do_one(long timeout_us)
827 : {
828 174585 : std::unique_lock lock(mutex_);
829 :
830 : for (;;)
831 : {
832 262093 : if (stopped_.load(std::memory_order_acquire))
833 20 : return 0;
834 :
835 262073 : scheduler_op* op = completed_ops_.pop();
836 :
837 262073 : if (op == &task_op_)
838 : {
839 : // Check both global queue and private queue for pending handlers
840 87634 : auto* ctx = find_context(this);
841 175120 : bool more_handlers = !completed_ops_.empty() ||
842 87486 : (ctx && !ctx->private_queue.empty());
843 :
844 87634 : if (!more_handlers)
845 : {
846 11626 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
847 : {
848 126 : completed_ops_.push(&task_op_);
849 126 : return 0;
850 : }
851 5687 : if (timeout_us == 0)
852 : {
853 0 : completed_ops_.push(&task_op_);
854 0 : return 0;
855 : }
856 : }
857 :
858 87508 : reactor_interrupted_ = more_handlers || timeout_us == 0;
859 87508 : reactor_running_ = true;
860 :
861 87508 : if (more_handlers && idle_thread_count_ > 0)
862 0 : wakeup_event_.notify_one();
863 :
864 87508 : run_reactor(lock);
865 :
866 87508 : reactor_running_ = false;
867 87508 : completed_ops_.push(&task_op_);
868 87508 : continue;
869 87508 : }
870 :
871 174439 : if (op != nullptr)
872 : {
873 174439 : lock.unlock();
874 174439 : work_guard g{this};
875 174439 : (*op)();
876 174439 : return 1;
877 174439 : }
878 :
879 0 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
880 0 : return 0;
881 :
882 0 : if (timeout_us == 0)
883 0 : return 0;
884 :
885 : // Drain private queue before blocking (outstanding_work_ was already incremented)
886 0 : if (auto* ctx = find_context(this))
887 : {
888 0 : if (!ctx->private_queue.empty())
889 : {
890 0 : ctx->private_outstanding_work = 0;
891 0 : completed_ops_.splice(ctx->private_queue);
892 0 : continue;
893 : }
894 : }
895 :
896 0 : ++idle_thread_count_;
897 0 : if (timeout_us < 0)
898 0 : wakeup_event_.wait(lock);
899 : else
900 0 : wakeup_event_.wait_for(lock, std::chrono::microseconds(timeout_us));
901 0 : --idle_thread_count_;
902 87508 : }
903 174585 : }
904 :
905 : } // namespace boost::corosio::detail
906 :
907 : #endif
|