Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #include <boost/corosio/detail/platform.hpp>
11 :
12 : #if BOOST_COROSIO_HAS_SELECT
13 :
14 : #include "src/detail/select/sockets.hpp"
15 : #include "src/detail/endpoint_convert.hpp"
16 : #include "src/detail/make_err.hpp"
17 :
18 : #include <boost/capy/buffers.hpp>
19 :
20 : #include <errno.h>
21 : #include <fcntl.h>
22 : #include <netinet/in.h>
23 : #include <netinet/tcp.h>
24 : #include <sys/socket.h>
25 : #include <unistd.h>
26 :
27 : namespace boost::corosio::detail {
28 :
29 : void
30 98 : select_op::canceller::
31 : operator()() const noexcept
32 : {
33 98 : op->cancel();
34 98 : }
35 :
36 : void
37 0 : select_connect_op::
38 : cancel() noexcept
39 : {
40 0 : if (socket_impl_)
41 0 : socket_impl_->cancel_single_op(*this);
42 : else
43 0 : request_cancel();
44 0 : }
45 :
46 : void
47 98 : select_read_op::
48 : cancel() noexcept
49 : {
50 98 : if (socket_impl_)
51 98 : socket_impl_->cancel_single_op(*this);
52 : else
53 0 : request_cancel();
54 98 : }
55 :
56 : void
57 0 : select_write_op::
58 : cancel() noexcept
59 : {
60 0 : if (socket_impl_)
61 0 : socket_impl_->cancel_single_op(*this);
62 : else
63 0 : request_cancel();
64 0 : }
65 :
66 : void
67 2180 : select_connect_op::
68 : operator()()
69 : {
70 2180 : stop_cb.reset();
71 :
72 2180 : bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
73 :
74 : // Cache endpoints on successful connect
75 2180 : if (success && socket_impl_)
76 : {
77 : // Query local endpoint via getsockname (may fail, but remote is always known)
78 2178 : endpoint local_ep;
79 2178 : sockaddr_in local_addr{};
80 2178 : socklen_t local_len = sizeof(local_addr);
81 2178 : if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
82 2178 : local_ep = from_sockaddr_in(local_addr);
83 : // Always cache remote endpoint; local may be default if getsockname failed
84 2178 : static_cast<select_socket_impl*>(socket_impl_)->set_endpoints(local_ep, target_endpoint);
85 : }
86 :
87 2180 : if (ec_out)
88 : {
89 2180 : if (cancelled.load(std::memory_order_acquire))
90 0 : *ec_out = capy::error::canceled;
91 2180 : else if (errn != 0)
92 2 : *ec_out = make_err(errn);
93 : else
94 2178 : *ec_out = {};
95 : }
96 :
97 2180 : if (bytes_out)
98 0 : *bytes_out = bytes_transferred;
99 :
100 : // Move to stack before destroying the frame
101 2180 : capy::executor_ref saved_ex( std::move( ex ) );
102 2180 : capy::coro saved_h( std::move( h ) );
103 2180 : impl_ptr.reset();
104 2180 : saved_ex.dispatch( saved_h );
105 2180 : }
106 :
107 4369 : select_socket_impl::
108 4369 : select_socket_impl(select_socket_service& svc) noexcept
109 4369 : : svc_(svc)
110 : {
111 4369 : }
112 :
113 : void
114 4369 : select_socket_impl::
115 : release()
116 : {
117 4369 : close_socket();
118 4369 : svc_.destroy_impl(*this);
119 4369 : }
120 :
121 : void
122 2180 : select_socket_impl::
123 : connect(
124 : std::coroutine_handle<> h,
125 : capy::executor_ref ex,
126 : endpoint ep,
127 : std::stop_token token,
128 : std::error_code* ec)
129 : {
130 2180 : auto& op = conn_;
131 2180 : op.reset();
132 2180 : op.h = h;
133 2180 : op.ex = ex;
134 2180 : op.ec_out = ec;
135 2180 : op.fd = fd_;
136 2180 : op.target_endpoint = ep; // Store target for endpoint caching
137 2180 : op.start(token, this);
138 :
139 2180 : sockaddr_in addr = detail::to_sockaddr_in(ep);
140 2180 : int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
141 :
142 2180 : if (result == 0)
143 : {
144 : // Sync success - cache endpoints immediately
145 0 : sockaddr_in local_addr{};
146 0 : socklen_t local_len = sizeof(local_addr);
147 0 : if (::getsockname(fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
148 0 : local_endpoint_ = detail::from_sockaddr_in(local_addr);
149 0 : remote_endpoint_ = ep;
150 :
151 0 : op.complete(0, 0);
152 0 : op.impl_ptr = shared_from_this();
153 0 : svc_.post(&op);
154 0 : return;
155 : }
156 :
157 2180 : if (errno == EINPROGRESS)
158 : {
159 2180 : svc_.work_started();
160 2180 : op.impl_ptr = shared_from_this();
161 :
162 : // Set registering BEFORE register_fd to close the race window where
163 : // reactor sees an event before we set registered. The reactor treats
164 : // registering the same as registered when claiming the op.
165 2180 : op.registered.store(select_registration_state::registering, std::memory_order_release);
166 2180 : svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
167 :
168 : // Transition to registered. If this fails, reactor or cancel already
169 : // claimed the op (state is now unregistered), so we're done. However,
170 : // we must still deregister the fd because cancel's deregister_fd may
171 : // have run before our register_fd, leaving the fd orphaned.
172 2180 : auto expected = select_registration_state::registering;
173 2180 : if (!op.registered.compare_exchange_strong(
174 : expected, select_registration_state::registered, std::memory_order_acq_rel))
175 : {
176 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
177 0 : return;
178 : }
179 :
180 : // If cancelled was set before we registered, handle it now.
181 2180 : if (op.cancelled.load(std::memory_order_acquire))
182 : {
183 0 : auto prev = op.registered.exchange(
184 : select_registration_state::unregistered, std::memory_order_acq_rel);
185 0 : if (prev != select_registration_state::unregistered)
186 : {
187 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
188 0 : op.impl_ptr = shared_from_this();
189 0 : svc_.post(&op);
190 0 : svc_.work_finished();
191 : }
192 : }
193 2180 : return;
194 : }
195 :
196 0 : op.complete(errno, 0);
197 0 : op.impl_ptr = shared_from_this();
198 0 : svc_.post(&op);
199 : }
200 :
201 : std::coroutine_handle<>
202 81497 : select_socket_impl::
203 : read_some(
204 : std::coroutine_handle<> h,
205 : capy::executor_ref ex,
206 : io_buffer_param param,
207 : std::stop_token token,
208 : std::error_code* ec,
209 : std::size_t* bytes_out)
210 : {
211 81497 : auto& op = rd_;
212 81497 : op.reset();
213 81497 : op.h = h;
214 81497 : op.ex = ex;
215 81497 : op.ec_out = ec;
216 81497 : op.bytes_out = bytes_out;
217 81497 : op.fd = fd_;
218 81497 : op.start(token, this);
219 :
220 81497 : capy::mutable_buffer bufs[select_read_op::max_buffers];
221 81497 : op.iovec_count = static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
222 :
223 81497 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
224 : {
225 1 : op.empty_buffer_read = true;
226 1 : op.complete(0, 0);
227 1 : op.impl_ptr = shared_from_this();
228 1 : svc_.post(&op);
229 1 : return std::noop_coroutine();
230 : }
231 :
232 162992 : for (int i = 0; i < op.iovec_count; ++i)
233 : {
234 81496 : op.iovecs[i].iov_base = bufs[i].data();
235 81496 : op.iovecs[i].iov_len = bufs[i].size();
236 : }
237 :
238 81496 : ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
239 :
240 81496 : if (n > 0)
241 : {
242 81327 : op.complete(0, static_cast<std::size_t>(n));
243 81327 : op.impl_ptr = shared_from_this();
244 81327 : svc_.post(&op);
245 81327 : return std::noop_coroutine();
246 : }
247 :
248 169 : if (n == 0)
249 : {
250 5 : op.complete(0, 0);
251 5 : op.impl_ptr = shared_from_this();
252 5 : svc_.post(&op);
253 5 : return std::noop_coroutine();
254 : }
255 :
256 164 : if (errno == EAGAIN || errno == EWOULDBLOCK)
257 : {
258 164 : svc_.work_started();
259 164 : op.impl_ptr = shared_from_this();
260 :
261 : // Set registering BEFORE register_fd to close the race window where
262 : // reactor sees an event before we set registered.
263 164 : op.registered.store(select_registration_state::registering, std::memory_order_release);
264 164 : svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
265 :
266 : // Transition to registered. If this fails, reactor or cancel already
267 : // claimed the op (state is now unregistered), so we're done. However,
268 : // we must still deregister the fd because cancel's deregister_fd may
269 : // have run before our register_fd, leaving the fd orphaned.
270 164 : auto expected = select_registration_state::registering;
271 164 : if (!op.registered.compare_exchange_strong(
272 : expected, select_registration_state::registered, std::memory_order_acq_rel))
273 : {
274 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
275 0 : return std::noop_coroutine();
276 : }
277 :
278 : // If cancelled was set before we registered, handle it now.
279 164 : if (op.cancelled.load(std::memory_order_acquire))
280 : {
281 0 : auto prev = op.registered.exchange(
282 : select_registration_state::unregistered, std::memory_order_acq_rel);
283 0 : if (prev != select_registration_state::unregistered)
284 : {
285 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
286 0 : op.impl_ptr = shared_from_this();
287 0 : svc_.post(&op);
288 0 : svc_.work_finished();
289 : }
290 : }
291 164 : return std::noop_coroutine();
292 : }
293 :
294 0 : op.complete(errno, 0);
295 0 : op.impl_ptr = shared_from_this();
296 0 : svc_.post(&op);
297 0 : return std::noop_coroutine();
298 : }
299 :
300 : std::coroutine_handle<>
301 81377 : select_socket_impl::
302 : write_some(
303 : std::coroutine_handle<> h,
304 : capy::executor_ref ex,
305 : io_buffer_param param,
306 : std::stop_token token,
307 : std::error_code* ec,
308 : std::size_t* bytes_out)
309 : {
310 81377 : auto& op = wr_;
311 81377 : op.reset();
312 81377 : op.h = h;
313 81377 : op.ex = ex;
314 81377 : op.ec_out = ec;
315 81377 : op.bytes_out = bytes_out;
316 81377 : op.fd = fd_;
317 81377 : op.start(token, this);
318 :
319 81377 : capy::mutable_buffer bufs[select_write_op::max_buffers];
320 81377 : op.iovec_count = static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
321 :
322 81377 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
323 : {
324 1 : op.complete(0, 0);
325 1 : op.impl_ptr = shared_from_this();
326 1 : svc_.post(&op);
327 1 : return std::noop_coroutine();
328 : }
329 :
330 162752 : for (int i = 0; i < op.iovec_count; ++i)
331 : {
332 81376 : op.iovecs[i].iov_base = bufs[i].data();
333 81376 : op.iovecs[i].iov_len = bufs[i].size();
334 : }
335 :
336 81376 : msghdr msg{};
337 81376 : msg.msg_iov = op.iovecs;
338 81376 : msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
339 :
340 81376 : ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
341 :
342 81376 : if (n > 0)
343 : {
344 81375 : op.complete(0, static_cast<std::size_t>(n));
345 81375 : op.impl_ptr = shared_from_this();
346 81375 : svc_.post(&op);
347 81375 : return std::noop_coroutine();
348 : }
349 :
350 1 : if (errno == EAGAIN || errno == EWOULDBLOCK)
351 : {
352 0 : svc_.work_started();
353 0 : op.impl_ptr = shared_from_this();
354 :
355 : // Set registering BEFORE register_fd to close the race window where
356 : // reactor sees an event before we set registered.
357 0 : op.registered.store(select_registration_state::registering, std::memory_order_release);
358 0 : svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
359 :
360 : // Transition to registered. If this fails, reactor or cancel already
361 : // claimed the op (state is now unregistered), so we're done. However,
362 : // we must still deregister the fd because cancel's deregister_fd may
363 : // have run before our register_fd, leaving the fd orphaned.
364 0 : auto expected = select_registration_state::registering;
365 0 : if (!op.registered.compare_exchange_strong(
366 : expected, select_registration_state::registered, std::memory_order_acq_rel))
367 : {
368 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
369 0 : return std::noop_coroutine();
370 : }
371 :
372 : // If cancelled was set before we registered, handle it now.
373 0 : if (op.cancelled.load(std::memory_order_acquire))
374 : {
375 0 : auto prev = op.registered.exchange(
376 : select_registration_state::unregistered, std::memory_order_acq_rel);
377 0 : if (prev != select_registration_state::unregistered)
378 : {
379 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
380 0 : op.impl_ptr = shared_from_this();
381 0 : svc_.post(&op);
382 0 : svc_.work_finished();
383 : }
384 : }
385 0 : return std::noop_coroutine();
386 : }
387 :
388 1 : op.complete(errno ? errno : EIO, 0);
389 1 : op.impl_ptr = shared_from_this();
390 1 : svc_.post(&op);
391 1 : return std::noop_coroutine();
392 : }
393 :
394 : std::error_code
395 3 : select_socket_impl::
396 : shutdown(tcp_socket::shutdown_type what) noexcept
397 : {
398 : int how;
399 3 : switch (what)
400 : {
401 1 : case tcp_socket::shutdown_receive: how = SHUT_RD; break;
402 1 : case tcp_socket::shutdown_send: how = SHUT_WR; break;
403 1 : case tcp_socket::shutdown_both: how = SHUT_RDWR; break;
404 0 : default:
405 0 : return make_err(EINVAL);
406 : }
407 3 : if (::shutdown(fd_, how) != 0)
408 0 : return make_err(errno);
409 3 : return {};
410 : }
411 :
412 : std::error_code
413 5 : select_socket_impl::
414 : set_no_delay(bool value) noexcept
415 : {
416 5 : int flag = value ? 1 : 0;
417 5 : if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
418 0 : return make_err(errno);
419 5 : return {};
420 : }
421 :
422 : bool
423 5 : select_socket_impl::
424 : no_delay(std::error_code& ec) const noexcept
425 : {
426 5 : int flag = 0;
427 5 : socklen_t len = sizeof(flag);
428 5 : if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
429 : {
430 0 : ec = make_err(errno);
431 0 : return false;
432 : }
433 5 : ec = {};
434 5 : return flag != 0;
435 : }
436 :
437 : std::error_code
438 4 : select_socket_impl::
439 : set_keep_alive(bool value) noexcept
440 : {
441 4 : int flag = value ? 1 : 0;
442 4 : if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
443 0 : return make_err(errno);
444 4 : return {};
445 : }
446 :
447 : bool
448 4 : select_socket_impl::
449 : keep_alive(std::error_code& ec) const noexcept
450 : {
451 4 : int flag = 0;
452 4 : socklen_t len = sizeof(flag);
453 4 : if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
454 : {
455 0 : ec = make_err(errno);
456 0 : return false;
457 : }
458 4 : ec = {};
459 4 : return flag != 0;
460 : }
461 :
462 : std::error_code
463 1 : select_socket_impl::
464 : set_receive_buffer_size(int size) noexcept
465 : {
466 1 : if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
467 0 : return make_err(errno);
468 1 : return {};
469 : }
470 :
471 : int
472 3 : select_socket_impl::
473 : receive_buffer_size(std::error_code& ec) const noexcept
474 : {
475 3 : int size = 0;
476 3 : socklen_t len = sizeof(size);
477 3 : if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
478 : {
479 0 : ec = make_err(errno);
480 0 : return 0;
481 : }
482 3 : ec = {};
483 3 : return size;
484 : }
485 :
486 : std::error_code
487 1 : select_socket_impl::
488 : set_send_buffer_size(int size) noexcept
489 : {
490 1 : if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
491 0 : return make_err(errno);
492 1 : return {};
493 : }
494 :
495 : int
496 3 : select_socket_impl::
497 : send_buffer_size(std::error_code& ec) const noexcept
498 : {
499 3 : int size = 0;
500 3 : socklen_t len = sizeof(size);
501 3 : if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
502 : {
503 0 : ec = make_err(errno);
504 0 : return 0;
505 : }
506 3 : ec = {};
507 3 : return size;
508 : }
509 :
510 : std::error_code
511 4 : select_socket_impl::
512 : set_linger(bool enabled, int timeout) noexcept
513 : {
514 4 : if (timeout < 0)
515 1 : return make_err(EINVAL);
516 : struct ::linger lg;
517 3 : lg.l_onoff = enabled ? 1 : 0;
518 3 : lg.l_linger = timeout;
519 3 : if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
520 0 : return make_err(errno);
521 3 : return {};
522 : }
523 :
524 : tcp_socket::linger_options
525 3 : select_socket_impl::
526 : linger(std::error_code& ec) const noexcept
527 : {
528 3 : struct ::linger lg{};
529 3 : socklen_t len = sizeof(lg);
530 3 : if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
531 : {
532 0 : ec = make_err(errno);
533 0 : return {};
534 : }
535 3 : ec = {};
536 3 : return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
537 : }
538 :
539 : void
540 6654 : select_socket_impl::
541 : cancel() noexcept
542 : {
543 6654 : std::shared_ptr<select_socket_impl> self;
544 : try {
545 6654 : self = shared_from_this();
546 0 : } catch (const std::bad_weak_ptr&) {
547 0 : return;
548 0 : }
549 :
550 19962 : auto cancel_op = [this, &self](select_op& op, int events) {
551 19962 : auto prev = op.registered.exchange(
552 : select_registration_state::unregistered, std::memory_order_acq_rel);
553 19962 : op.request_cancel();
554 19962 : if (prev != select_registration_state::unregistered)
555 : {
556 50 : svc_.scheduler().deregister_fd(fd_, events);
557 50 : op.impl_ptr = self;
558 50 : svc_.post(&op);
559 50 : svc_.work_finished();
560 : }
561 26616 : };
562 :
563 6654 : cancel_op(conn_, select_scheduler::event_write);
564 6654 : cancel_op(rd_, select_scheduler::event_read);
565 6654 : cancel_op(wr_, select_scheduler::event_write);
566 6654 : }
567 :
568 : void
569 98 : select_socket_impl::
570 : cancel_single_op(select_op& op) noexcept
571 : {
572 : // Called from stop_token callback to cancel a specific pending operation.
573 98 : auto prev = op.registered.exchange(
574 : select_registration_state::unregistered, std::memory_order_acq_rel);
575 98 : op.request_cancel();
576 :
577 98 : if (prev != select_registration_state::unregistered)
578 : {
579 : // Determine which event type to deregister
580 66 : int events = 0;
581 66 : if (&op == &conn_ || &op == &wr_)
582 0 : events = select_scheduler::event_write;
583 66 : else if (&op == &rd_)
584 66 : events = select_scheduler::event_read;
585 :
586 66 : svc_.scheduler().deregister_fd(fd_, events);
587 :
588 : // Keep impl alive until op completes
589 : try {
590 66 : op.impl_ptr = shared_from_this();
591 0 : } catch (const std::bad_weak_ptr&) {
592 : // Impl is being destroyed, op will be orphaned but that's ok
593 0 : }
594 :
595 66 : svc_.post(&op);
596 66 : svc_.work_finished();
597 : }
598 98 : }
599 :
600 : void
601 6560 : select_socket_impl::
602 : close_socket() noexcept
603 : {
604 6560 : cancel();
605 :
606 6560 : if (fd_ >= 0)
607 : {
608 : // Unconditionally remove from registered_fds_ to handle edge cases
609 : // where the fd might be registered but cancel() didn't clean it up
610 : // due to race conditions.
611 4369 : svc_.scheduler().deregister_fd(fd_,
612 : select_scheduler::event_read | select_scheduler::event_write);
613 4369 : ::close(fd_);
614 4369 : fd_ = -1;
615 : }
616 :
617 : // Clear cached endpoints
618 6560 : local_endpoint_ = endpoint{};
619 6560 : remote_endpoint_ = endpoint{};
620 6560 : }
621 :
622 120 : select_socket_service::
623 120 : select_socket_service(capy::execution_context& ctx)
624 120 : : state_(std::make_unique<select_socket_state>(ctx.use_service<select_scheduler>()))
625 : {
626 120 : }
627 :
628 240 : select_socket_service::
629 120 : ~select_socket_service()
630 : {
631 240 : }
632 :
633 : void
634 120 : select_socket_service::
635 : shutdown()
636 : {
637 120 : std::lock_guard lock(state_->mutex_);
638 :
639 120 : while (auto* impl = state_->socket_list_.pop_front())
640 0 : impl->close_socket();
641 :
642 120 : state_->socket_ptrs_.clear();
643 120 : }
644 :
645 : tcp_socket::socket_impl&
646 4369 : select_socket_service::
647 : create_impl()
648 : {
649 4369 : auto impl = std::make_shared<select_socket_impl>(*this);
650 4369 : auto* raw = impl.get();
651 :
652 : {
653 4369 : std::lock_guard lock(state_->mutex_);
654 4369 : state_->socket_list_.push_back(raw);
655 4369 : state_->socket_ptrs_.emplace(raw, std::move(impl));
656 4369 : }
657 :
658 4369 : return *raw;
659 4369 : }
660 :
661 : void
662 4369 : select_socket_service::
663 : destroy_impl(tcp_socket::socket_impl& impl)
664 : {
665 4369 : auto* select_impl = static_cast<select_socket_impl*>(&impl);
666 4369 : std::lock_guard lock(state_->mutex_);
667 4369 : state_->socket_list_.remove(select_impl);
668 4369 : state_->socket_ptrs_.erase(select_impl);
669 4369 : }
670 :
671 : std::error_code
672 2191 : select_socket_service::
673 : open_socket(tcp_socket::socket_impl& impl)
674 : {
675 2191 : auto* select_impl = static_cast<select_socket_impl*>(&impl);
676 2191 : select_impl->close_socket();
677 :
678 2191 : int fd = ::socket(AF_INET, SOCK_STREAM, 0);
679 2191 : if (fd < 0)
680 0 : return make_err(errno);
681 :
682 : // Set non-blocking and close-on-exec
683 2191 : int flags = ::fcntl(fd, F_GETFL, 0);
684 2191 : if (flags == -1)
685 : {
686 0 : int errn = errno;
687 0 : ::close(fd);
688 0 : return make_err(errn);
689 : }
690 2191 : if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
691 : {
692 0 : int errn = errno;
693 0 : ::close(fd);
694 0 : return make_err(errn);
695 : }
696 2191 : if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
697 : {
698 0 : int errn = errno;
699 0 : ::close(fd);
700 0 : return make_err(errn);
701 : }
702 :
703 : // Check fd is within select() limits
704 2191 : if (fd >= FD_SETSIZE)
705 : {
706 0 : ::close(fd);
707 0 : return make_err(EMFILE); // Too many open files
708 : }
709 :
710 2191 : select_impl->fd_ = fd;
711 2191 : return {};
712 : }
713 :
714 : void
715 162826 : select_socket_service::
716 : post(select_op* op)
717 : {
718 162826 : state_->sched_.post(op);
719 162826 : }
720 :
721 : void
722 2344 : select_socket_service::
723 : work_started() noexcept
724 : {
725 2344 : state_->sched_.work_started();
726 2344 : }
727 :
728 : void
729 116 : select_socket_service::
730 : work_finished() noexcept
731 : {
732 116 : state_->sched_.work_finished();
733 116 : }
734 :
735 : } // namespace boost::corosio::detail
736 :
737 : #endif // BOOST_COROSIO_HAS_SELECT
|