Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #include <boost/corosio/detail/platform.hpp>
11 :
12 : #if BOOST_COROSIO_HAS_SELECT
13 :
14 : #include "src/detail/select/scheduler.hpp"
15 : #include "src/detail/select/op.hpp"
16 : #include "src/detail/make_err.hpp"
17 : #include "src/detail/posix/resolver_service.hpp"
18 : #include "src/detail/posix/signals.hpp"
19 :
20 : #include <boost/corosio/detail/except.hpp>
21 : #include <boost/corosio/detail/thread_local_ptr.hpp>
22 :
23 : #include <chrono>
24 : #include <limits>
25 :
26 : #include <errno.h>
27 : #include <fcntl.h>
28 : #include <sys/select.h>
29 : #include <sys/socket.h>
30 : #include <unistd.h>
31 :
32 : /*
33 : select Scheduler - Single Reactor Model
34 : =======================================
35 :
36 : This scheduler mirrors the epoll_scheduler design but uses select() instead
37 : of epoll for I/O multiplexing. The thread coordination strategy is identical:
38 : one thread becomes the "reactor" while others wait on a condition variable.
39 :
40 : Thread Model
41 : ------------
42 : - ONE thread runs select() at a time (the reactor thread)
43 : - OTHER threads wait on wakeup_event_ (condition variable) for handlers
44 : - When work is posted, exactly one waiting thread wakes via notify_one()
45 :
46 : Key Differences from epoll
47 : --------------------------
48 : - Uses self-pipe instead of eventfd for interruption (more portable)
49 : - fd_set rebuilding each iteration (O(n) vs O(1) for epoll)
50 : - FD_SETSIZE limit (~1024 fds on most systems)
51 : - Level-triggered only (no edge-triggered mode)
52 :
53 : Self-Pipe Pattern
54 : -----------------
55 : To interrupt a blocking select() call (e.g., when work is posted or a timer
56 : expires), we write a byte to pipe_fds_[1]. The read end pipe_fds_[0] is
57 : always in the read_fds set, so select() returns immediately. We drain the
58 : pipe to clear the readable state.
59 :
60 : fd-to-op Mapping
61 : ----------------
62 : We use an unordered_map<int, fd_state> to track which operations are
63 : registered for each fd. This allows O(1) lookup when select() returns
64 : ready fds. Each fd can have at most one read op and one write op registered.
65 : */
66 :
67 : namespace boost::corosio::detail {
68 :
69 : namespace {
70 :
71 : struct scheduler_context
72 : {
73 : select_scheduler const* key;
74 : scheduler_context* next;
75 : };
76 :
77 : corosio::detail::thread_local_ptr<scheduler_context> context_stack;
78 :
79 : struct thread_context_guard
80 : {
81 : scheduler_context frame_;
82 :
83 92 : explicit thread_context_guard(
84 : select_scheduler const* ctx) noexcept
85 92 : : frame_{ctx, context_stack.get()}
86 : {
87 92 : context_stack.set(&frame_);
88 92 : }
89 :
90 92 : ~thread_context_guard() noexcept
91 : {
92 92 : context_stack.set(frame_.next);
93 92 : }
94 : };
95 :
96 : } // namespace
97 :
98 120 : select_scheduler::
99 : select_scheduler(
100 : capy::execution_context& ctx,
101 120 : int)
102 120 : : pipe_fds_{-1, -1}
103 120 : , outstanding_work_(0)
104 120 : , stopped_(false)
105 120 : , shutdown_(false)
106 120 : , max_fd_(-1)
107 120 : , reactor_running_(false)
108 120 : , reactor_interrupted_(false)
109 240 : , idle_thread_count_(0)
110 : {
111 : // Create self-pipe for interrupting select()
112 120 : if (::pipe(pipe_fds_) < 0)
113 0 : detail::throw_system_error(make_err(errno), "pipe");
114 :
115 : // Set both ends to non-blocking and close-on-exec
116 360 : for (int i = 0; i < 2; ++i)
117 : {
118 240 : int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
119 240 : if (flags == -1)
120 : {
121 0 : int errn = errno;
122 0 : ::close(pipe_fds_[0]);
123 0 : ::close(pipe_fds_[1]);
124 0 : detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
125 : }
126 240 : if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
127 : {
128 0 : int errn = errno;
129 0 : ::close(pipe_fds_[0]);
130 0 : ::close(pipe_fds_[1]);
131 0 : detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
132 : }
133 240 : if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
134 : {
135 0 : int errn = errno;
136 0 : ::close(pipe_fds_[0]);
137 0 : ::close(pipe_fds_[1]);
138 0 : detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
139 : }
140 : }
141 :
142 120 : timer_svc_ = &get_timer_service(ctx, *this);
143 120 : timer_svc_->set_on_earliest_changed(
144 : timer_service::callback(
145 : this,
146 2423 : [](void* p) { static_cast<select_scheduler*>(p)->interrupt_reactor(); }));
147 :
148 : // Initialize resolver service
149 120 : get_resolver_service(ctx, *this);
150 :
151 : // Initialize signal service
152 120 : get_signal_service(ctx, *this);
153 :
154 : // Push task sentinel to interleave reactor runs with handler execution
155 120 : completed_ops_.push(&task_op_);
156 120 : }
157 :
158 240 : select_scheduler::
159 120 : ~select_scheduler()
160 : {
161 120 : if (pipe_fds_[0] >= 0)
162 120 : ::close(pipe_fds_[0]);
163 120 : if (pipe_fds_[1] >= 0)
164 120 : ::close(pipe_fds_[1]);
165 240 : }
166 :
167 : void
168 120 : select_scheduler::
169 : shutdown()
170 : {
171 : {
172 120 : std::unique_lock lock(mutex_);
173 120 : shutdown_ = true;
174 :
175 240 : while (auto* h = completed_ops_.pop())
176 : {
177 120 : if (h == &task_op_)
178 120 : continue;
179 0 : lock.unlock();
180 0 : h->destroy();
181 0 : lock.lock();
182 120 : }
183 120 : }
184 :
185 120 : outstanding_work_.store(0, std::memory_order_release);
186 :
187 120 : if (pipe_fds_[1] >= 0)
188 120 : interrupt_reactor();
189 :
190 120 : wakeup_event_.notify_all();
191 120 : }
192 :
193 : void
194 168 : select_scheduler::
195 : post(capy::coro h) const
196 : {
197 : struct post_handler final
198 : : scheduler_op
199 : {
200 : capy::coro h_;
201 :
202 : explicit
203 168 : post_handler(capy::coro h)
204 168 : : h_(h)
205 : {
206 168 : }
207 :
208 336 : ~post_handler() = default;
209 :
210 168 : void operator()() override
211 : {
212 168 : auto h = h_;
213 168 : delete this;
214 168 : h.resume();
215 168 : }
216 :
217 0 : void destroy() override
218 : {
219 0 : delete this;
220 0 : }
221 : };
222 :
223 168 : auto ph = std::make_unique<post_handler>(h);
224 168 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
225 :
226 168 : std::unique_lock lock(mutex_);
227 168 : completed_ops_.push(ph.release());
228 168 : wake_one_thread_and_unlock(lock);
229 168 : }
230 :
231 : void
232 162842 : select_scheduler::
233 : post(scheduler_op* h) const
234 : {
235 162842 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
236 :
237 162842 : std::unique_lock lock(mutex_);
238 162842 : completed_ops_.push(h);
239 162842 : wake_one_thread_and_unlock(lock);
240 162842 : }
241 :
242 : void
243 2420 : select_scheduler::
244 : on_work_started() noexcept
245 : {
246 2420 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
247 2420 : }
248 :
249 : void
250 2414 : select_scheduler::
251 : on_work_finished() noexcept
252 : {
253 4828 : if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
254 18 : stop();
255 2414 : }
256 :
257 : bool
258 167632 : select_scheduler::
259 : running_in_this_thread() const noexcept
260 : {
261 167632 : for (auto* c = context_stack.get(); c != nullptr; c = c->next)
262 167477 : if (c->key == this)
263 167477 : return true;
264 155 : return false;
265 : }
266 :
267 : void
268 18 : select_scheduler::
269 : stop()
270 : {
271 18 : bool expected = false;
272 18 : if (stopped_.compare_exchange_strong(expected, true,
273 : std::memory_order_release, std::memory_order_relaxed))
274 : {
275 : // Wake all threads so they notice stopped_ and exit
276 : {
277 18 : std::lock_guard lock(mutex_);
278 18 : wakeup_event_.notify_all();
279 18 : }
280 18 : interrupt_reactor();
281 : }
282 18 : }
283 :
284 : bool
285 1 : select_scheduler::
286 : stopped() const noexcept
287 : {
288 1 : return stopped_.load(std::memory_order_acquire);
289 : }
290 :
291 : void
292 34 : select_scheduler::
293 : restart()
294 : {
295 34 : stopped_.store(false, std::memory_order_release);
296 34 : }
297 :
298 : std::size_t
299 89 : select_scheduler::
300 : run()
301 : {
302 89 : if (stopped_.load(std::memory_order_acquire))
303 0 : return 0;
304 :
305 178 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
306 : {
307 0 : stop();
308 0 : return 0;
309 : }
310 :
311 89 : thread_context_guard ctx(this);
312 :
313 89 : std::size_t n = 0;
314 167501 : while (do_one(-1))
315 167412 : if (n != (std::numeric_limits<std::size_t>::max)())
316 167412 : ++n;
317 89 : return n;
318 89 : }
319 :
320 : std::size_t
321 0 : select_scheduler::
322 : run_one()
323 : {
324 0 : if (stopped_.load(std::memory_order_acquire))
325 0 : return 0;
326 :
327 0 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
328 : {
329 0 : stop();
330 0 : return 0;
331 : }
332 :
333 0 : thread_context_guard ctx(this);
334 0 : return do_one(-1);
335 0 : }
336 :
337 : std::size_t
338 3 : select_scheduler::
339 : wait_one(long usec)
340 : {
341 3 : if (stopped_.load(std::memory_order_acquire))
342 0 : return 0;
343 :
344 6 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
345 : {
346 0 : stop();
347 0 : return 0;
348 : }
349 :
350 3 : thread_context_guard ctx(this);
351 3 : return do_one(usec);
352 3 : }
353 :
354 : std::size_t
355 0 : select_scheduler::
356 : poll()
357 : {
358 0 : if (stopped_.load(std::memory_order_acquire))
359 0 : return 0;
360 :
361 0 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
362 : {
363 0 : stop();
364 0 : return 0;
365 : }
366 :
367 0 : thread_context_guard ctx(this);
368 :
369 0 : std::size_t n = 0;
370 0 : while (do_one(0))
371 0 : if (n != (std::numeric_limits<std::size_t>::max)())
372 0 : ++n;
373 0 : return n;
374 0 : }
375 :
376 : std::size_t
377 0 : select_scheduler::
378 : poll_one()
379 : {
380 0 : if (stopped_.load(std::memory_order_acquire))
381 0 : return 0;
382 :
383 0 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
384 : {
385 0 : stop();
386 0 : return 0;
387 : }
388 :
389 0 : thread_context_guard ctx(this);
390 0 : return do_one(0);
391 0 : }
392 :
393 : void
394 4523 : select_scheduler::
395 : register_fd(int fd, select_op* op, int events) const
396 : {
397 : // Validate fd is within select() limits
398 4523 : if (fd < 0 || fd >= FD_SETSIZE)
399 0 : detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
400 :
401 : {
402 4523 : std::lock_guard lock(mutex_);
403 :
404 4523 : auto& state = registered_fds_[fd];
405 4523 : if (events & event_read)
406 2343 : state.read_op = op;
407 4523 : if (events & event_write)
408 2180 : state.write_op = op;
409 :
410 4523 : if (fd > max_fd_)
411 184 : max_fd_ = fd;
412 4523 : }
413 :
414 : // Wake the reactor so a thread blocked in select() rebuilds its fd_sets
415 : // with the newly registered fd.
416 4523 : interrupt_reactor();
417 4523 : }
418 :
419 : void
420 4530 : select_scheduler::
421 : deregister_fd(int fd, int events) const
422 : {
423 4530 : std::lock_guard lock(mutex_);
424 :
425 4530 : auto it = registered_fds_.find(fd);
426 4530 : if (it == registered_fds_.end())
427 4411 : return;
428 :
429 119 : if (events & event_read)
430 119 : it->second.read_op = nullptr;
431 119 : if (events & event_write)
432 0 : it->second.write_op = nullptr;
433 :
434 : // Remove entry if both are null
435 119 : if (!it->second.read_op && !it->second.write_op)
436 : {
437 119 : registered_fds_.erase(it);
438 :
439 : // Recalculate max_fd_ if needed
440 119 : if (fd == max_fd_)
441 : {
442 118 : max_fd_ = pipe_fds_[0]; // At minimum, the pipe read end
443 118 : for (auto& [registered_fd, state] : registered_fds_)
444 : {
445 0 : if (registered_fd > max_fd_)
446 0 : max_fd_ = registered_fd;
447 : }
448 : }
449 : }
450 4530 : }
451 :
452 : void
453 4523 : select_scheduler::
454 : work_started() const noexcept
455 : {
456 4523 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
457 4523 : }
458 :
459 : void
460 167539 : select_scheduler::
461 : work_finished() const noexcept
462 : {
463 335078 : if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
464 : {
465 : // Last work item completed - wake all threads so they can exit.
466 72 : std::unique_lock lock(mutex_);
467 72 : wakeup_event_.notify_all();
468 72 : if (reactor_running_ && !reactor_interrupted_)
469 : {
470 0 : reactor_interrupted_ = true;
471 0 : lock.unlock();
472 0 : interrupt_reactor();
473 : }
474 72 : }
475 167539 : }
476 :
477 : void
478 7292 : select_scheduler::
479 : interrupt_reactor() const
480 : {
481 7292 : char byte = 1;
482 7292 : [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
483 7292 : }
484 :
485 : void
486 163010 : select_scheduler::
487 : wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
488 : {
489 163010 : if (idle_thread_count_ > 0)
490 : {
491 : // Idle worker exists - wake it via condvar
492 0 : wakeup_event_.notify_one();
493 0 : lock.unlock();
494 : }
495 163010 : else if (reactor_running_ && !reactor_interrupted_)
496 : {
497 : // No idle workers but reactor is running - interrupt it
498 208 : reactor_interrupted_ = true;
499 208 : lock.unlock();
500 208 : interrupt_reactor();
501 : }
502 : else
503 : {
504 : // No one to wake
505 162802 : lock.unlock();
506 : }
507 163010 : }
508 :
509 : struct work_guard
510 : {
511 : select_scheduler const* self;
512 341853 : ~work_guard() { self->work_finished(); }
513 : };
514 :
515 : long
516 6822 : select_scheduler::
517 : calculate_timeout(long requested_timeout_us) const
518 : {
519 6822 : if (requested_timeout_us == 0)
520 0 : return 0;
521 :
522 6822 : auto nearest = timer_svc_->nearest_expiry();
523 6822 : if (nearest == timer_service::time_point::max())
524 36 : return requested_timeout_us;
525 :
526 6786 : auto now = std::chrono::steady_clock::now();
527 6786 : if (nearest <= now)
528 38 : return 0;
529 :
530 6748 : auto timer_timeout_us = std::chrono::duration_cast<std::chrono::microseconds>(
531 13496 : nearest - now).count();
532 :
533 6748 : if (requested_timeout_us < 0)
534 6748 : return static_cast<long>(timer_timeout_us);
535 :
536 0 : return static_cast<long>((std::min)(
537 0 : static_cast<long long>(requested_timeout_us),
538 0 : static_cast<long long>(timer_timeout_us)));
539 : }
540 :
541 : void
542 82753 : select_scheduler::
543 : run_reactor(std::unique_lock<std::mutex>& lock)
544 : {
545 : // Calculate timeout considering timers, use 0 if interrupted
546 82753 : long effective_timeout_us = reactor_interrupted_ ? 0 : calculate_timeout(-1);
547 :
548 : // Build fd_sets from registered_fds_
549 : fd_set read_fds, write_fds, except_fds;
550 1406801 : FD_ZERO(&read_fds);
551 1406801 : FD_ZERO(&write_fds);
552 1406801 : FD_ZERO(&except_fds);
553 :
554 : // Always include the interrupt pipe
555 82753 : FD_SET(pipe_fds_[0], &read_fds);
556 82753 : int nfds = pipe_fds_[0];
557 :
558 : // Add registered fds
559 91479 : for (auto& [fd, state] : registered_fds_)
560 : {
561 8726 : if (state.read_op)
562 6546 : FD_SET(fd, &read_fds);
563 8726 : if (state.write_op)
564 : {
565 2180 : FD_SET(fd, &write_fds);
566 : // Also monitor for errors on connect operations
567 2180 : FD_SET(fd, &except_fds);
568 : }
569 8726 : if (fd > nfds)
570 6550 : nfds = fd;
571 : }
572 :
573 : // Convert timeout to timeval
574 : struct timeval tv;
575 82753 : struct timeval* tv_ptr = nullptr;
576 82753 : if (effective_timeout_us >= 0)
577 : {
578 82717 : tv.tv_sec = effective_timeout_us / 1000000;
579 82717 : tv.tv_usec = effective_timeout_us % 1000000;
580 82717 : tv_ptr = &tv;
581 : }
582 :
583 82753 : lock.unlock();
584 :
585 82753 : int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
586 82753 : int saved_errno = errno;
587 :
588 : // Process timers outside the lock
589 82753 : timer_svc_->process_expired();
590 :
591 82753 : if (ready < 0 && saved_errno != EINTR)
592 0 : detail::throw_system_error(make_err(saved_errno), "select");
593 :
594 : // Re-acquire lock before modifying completed_ops_
595 82753 : lock.lock();
596 :
597 : // Drain the interrupt pipe if readable
598 82753 : if (ready > 0 && FD_ISSET(pipe_fds_[0], &read_fds))
599 : {
600 : char buf[256];
601 9126 : while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0) {}
602 : }
603 :
604 : // Process I/O completions
605 82753 : int completions_queued = 0;
606 82753 : if (ready > 0)
607 : {
608 : // Iterate over registered fds (copy keys to avoid iterator invalidation)
609 4565 : std::vector<int> fds_to_check;
610 4565 : fds_to_check.reserve(registered_fds_.size());
611 11117 : for (auto& [fd, state] : registered_fds_)
612 6552 : fds_to_check.push_back(fd);
613 :
614 11117 : for (int fd : fds_to_check)
615 : {
616 6552 : auto it = registered_fds_.find(fd);
617 6552 : if (it == registered_fds_.end())
618 0 : continue;
619 :
620 6552 : auto& state = it->second;
621 :
622 : // Check for errors (especially for connect operations)
623 6552 : bool has_error = FD_ISSET(fd, &except_fds);
624 :
625 : // Process read readiness
626 6552 : if (state.read_op && (FD_ISSET(fd, &read_fds) || has_error))
627 : {
628 2224 : auto* op = state.read_op;
629 : // Claim the op by exchanging to unregistered. Both registering and
630 : // registered states mean the op is ours to complete.
631 2224 : auto prev = op->registered.exchange(
632 : select_registration_state::unregistered, std::memory_order_acq_rel);
633 2224 : if (prev != select_registration_state::unregistered)
634 : {
635 2224 : state.read_op = nullptr;
636 :
637 2224 : if (has_error)
638 : {
639 0 : int errn = 0;
640 0 : socklen_t len = sizeof(errn);
641 0 : if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
642 0 : errn = errno;
643 0 : if (errn == 0)
644 0 : errn = EIO;
645 0 : op->complete(errn, 0);
646 : }
647 : else
648 : {
649 2224 : op->perform_io();
650 : }
651 :
652 2224 : completed_ops_.push(op);
653 2224 : ++completions_queued;
654 : }
655 : }
656 :
657 : // Process write readiness
658 6552 : if (state.write_op && (FD_ISSET(fd, &write_fds) || has_error))
659 : {
660 2180 : auto* op = state.write_op;
661 : // Claim the op by exchanging to unregistered. Both registering and
662 : // registered states mean the op is ours to complete.
663 2180 : auto prev = op->registered.exchange(
664 : select_registration_state::unregistered, std::memory_order_acq_rel);
665 2180 : if (prev != select_registration_state::unregistered)
666 : {
667 2180 : state.write_op = nullptr;
668 :
669 2180 : if (has_error)
670 : {
671 0 : int errn = 0;
672 0 : socklen_t len = sizeof(errn);
673 0 : if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
674 0 : errn = errno;
675 0 : if (errn == 0)
676 0 : errn = EIO;
677 0 : op->complete(errn, 0);
678 : }
679 : else
680 : {
681 2180 : op->perform_io();
682 : }
683 :
684 2180 : completed_ops_.push(op);
685 2180 : ++completions_queued;
686 : }
687 : }
688 :
689 : // Clean up empty entries
690 6552 : if (!state.read_op && !state.write_op)
691 4404 : registered_fds_.erase(it);
692 : }
693 4565 : }
694 :
695 82753 : if (completions_queued > 0)
696 : {
697 2228 : if (completions_queued == 1)
698 52 : wakeup_event_.notify_one();
699 : else
700 2176 : wakeup_event_.notify_all();
701 : }
702 82753 : }
703 :
704 : std::size_t
705 167504 : select_scheduler::
706 : do_one(long timeout_us)
707 : {
708 167504 : std::unique_lock lock(mutex_);
709 :
710 : for (;;)
711 : {
712 250257 : if (stopped_.load(std::memory_order_acquire))
713 18 : return 0;
714 :
715 250239 : scheduler_op* op = completed_ops_.pop();
716 :
717 250239 : if (op == &task_op_)
718 : {
719 82825 : bool more_handlers = !completed_ops_.empty();
720 :
721 82825 : if (!more_handlers)
722 : {
723 13788 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
724 : {
725 72 : completed_ops_.push(&task_op_);
726 72 : return 0;
727 : }
728 6822 : if (timeout_us == 0)
729 : {
730 0 : completed_ops_.push(&task_op_);
731 0 : return 0;
732 : }
733 : }
734 :
735 82753 : reactor_interrupted_ = more_handlers || timeout_us == 0;
736 82753 : reactor_running_ = true;
737 :
738 82753 : if (more_handlers && idle_thread_count_ > 0)
739 0 : wakeup_event_.notify_one();
740 :
741 82753 : run_reactor(lock);
742 :
743 82753 : reactor_running_ = false;
744 82753 : completed_ops_.push(&task_op_);
745 82753 : continue;
746 82753 : }
747 :
748 167414 : if (op != nullptr)
749 : {
750 167414 : lock.unlock();
751 167414 : work_guard g{this};
752 167414 : (*op)();
753 167414 : return 1;
754 167414 : }
755 :
756 0 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
757 0 : return 0;
758 :
759 0 : if (timeout_us == 0)
760 0 : return 0;
761 :
762 0 : ++idle_thread_count_;
763 0 : if (timeout_us < 0)
764 0 : wakeup_event_.wait(lock);
765 : else
766 0 : wakeup_event_.wait_for(lock, std::chrono::microseconds(timeout_us));
767 0 : --idle_thread_count_;
768 82753 : }
769 167504 : }
770 :
771 : } // namespace boost::corosio::detail
772 :
773 : #endif // BOOST_COROSIO_HAS_SELECT
|