Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_DETAIL_SELECT_SOCKETS_HPP
11 : #define BOOST_COROSIO_DETAIL_SELECT_SOCKETS_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_SELECT
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/corosio/tcp_socket.hpp>
19 : #include <boost/capy/ex/executor_ref.hpp>
20 : #include <boost/capy/ex/execution_context.hpp>
21 : #include "src/detail/intrusive.hpp"
22 : #include "src/detail/socket_service.hpp"
23 :
24 : #include "src/detail/select/op.hpp"
25 : #include "src/detail/select/scheduler.hpp"
26 :
27 : #include <memory>
28 : #include <mutex>
29 : #include <unordered_map>
30 :
31 : /*
32 : select Socket Implementation
33 : ============================
34 :
35 : This mirrors the epoll_sockets design for behavioral consistency.
36 : Each I/O operation follows the same pattern:
37 : 1. Try the syscall immediately (non-blocking socket)
38 : 2. If it succeeds or fails with a real error, post to completion queue
39 : 3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
40 :
41 : Cancellation
42 : ------------
43 : See op.hpp for the completion/cancellation race handling via the
44 : `registered` atomic. cancel() must complete pending operations (post
45 : them with cancelled flag) so coroutines waiting on them can resume.
46 : close_socket() calls cancel() first to ensure this.
47 :
48 : Impl Lifetime with shared_ptr
49 : -----------------------------
50 : Socket impls use enable_shared_from_this. The service owns impls via
51 : shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
52 : removal. When a user calls close(), we call cancel() which posts pending
53 : ops to the scheduler.
54 :
55 : CRITICAL: The posted ops must keep the impl alive until they complete.
56 : Otherwise the scheduler would process a freed op (use-after-free). The
57 : cancel() method captures shared_from_this() into op.impl_ptr before
58 : posting. When the op completes, impl_ptr is cleared, allowing the impl
59 : to be destroyed if no other references exist.
60 :
61 : Service Ownership
62 : -----------------
63 : select_socket_service owns all socket impls. destroy_impl() removes the
64 : shared_ptr from the map, but the impl may survive if ops still hold
65 : impl_ptr refs. shutdown() closes all sockets and clears the map; any
66 : in-flight ops will complete and release their refs.
67 : */
68 :
69 : namespace boost::corosio::detail {
70 :
71 : class select_socket_service;
72 : class select_socket_impl;
73 :
74 : /// Socket implementation for select backend.
75 : class select_socket_impl
76 : : public tcp_socket::socket_impl
77 : , public std::enable_shared_from_this<select_socket_impl>
78 : , public intrusive_list<select_socket_impl>::node
79 : {
80 : friend class select_socket_service;
81 :
82 : public:
83 : explicit select_socket_impl(select_socket_service& svc) noexcept;
84 :
85 : void release() override;
86 :
87 : void connect(
88 : std::coroutine_handle<>,
89 : capy::executor_ref,
90 : endpoint,
91 : std::stop_token,
92 : std::error_code*) override;
93 :
94 : std::coroutine_handle<> read_some(
95 : std::coroutine_handle<>,
96 : capy::executor_ref,
97 : io_buffer_param,
98 : std::stop_token,
99 : std::error_code*,
100 : std::size_t*) override;
101 :
102 : std::coroutine_handle<> write_some(
103 : std::coroutine_handle<>,
104 : capy::executor_ref,
105 : io_buffer_param,
106 : std::stop_token,
107 : std::error_code*,
108 : std::size_t*) override;
109 :
110 : std::error_code shutdown(tcp_socket::shutdown_type what) noexcept override;
111 :
112 0 : native_handle_type native_handle() const noexcept override { return fd_; }
113 :
114 : // Socket options
115 : std::error_code set_no_delay(bool value) noexcept override;
116 : bool no_delay(std::error_code& ec) const noexcept override;
117 :
118 : std::error_code set_keep_alive(bool value) noexcept override;
119 : bool keep_alive(std::error_code& ec) const noexcept override;
120 :
121 : std::error_code set_receive_buffer_size(int size) noexcept override;
122 : int receive_buffer_size(std::error_code& ec) const noexcept override;
123 :
124 : std::error_code set_send_buffer_size(int size) noexcept override;
125 : int send_buffer_size(std::error_code& ec) const noexcept override;
126 :
127 : std::error_code set_linger(bool enabled, int timeout) noexcept override;
128 : tcp_socket::linger_options linger(std::error_code& ec) const noexcept override;
129 :
130 16 : endpoint local_endpoint() const noexcept override { return local_endpoint_; }
131 16 : endpoint remote_endpoint() const noexcept override { return remote_endpoint_; }
132 : bool is_open() const noexcept { return fd_ >= 0; }
133 : void cancel() noexcept override;
134 : void cancel_single_op(select_op& op) noexcept;
135 : void close_socket() noexcept;
136 2178 : void set_socket(int fd) noexcept { fd_ = fd; }
137 4356 : void set_endpoints(endpoint local, endpoint remote) noexcept
138 : {
139 4356 : local_endpoint_ = local;
140 4356 : remote_endpoint_ = remote;
141 4356 : }
142 :
143 : select_connect_op conn_;
144 : select_read_op rd_;
145 : select_write_op wr_;
146 :
147 : private:
148 : select_socket_service& svc_;
149 : int fd_ = -1;
150 : endpoint local_endpoint_;
151 : endpoint remote_endpoint_;
152 : };
153 :
154 : /** State for select socket service. */
155 : class select_socket_state
156 : {
157 : public:
158 120 : explicit select_socket_state(select_scheduler& sched) noexcept
159 120 : : sched_(sched)
160 : {
161 120 : }
162 :
163 : select_scheduler& sched_;
164 : std::mutex mutex_;
165 : intrusive_list<select_socket_impl> socket_list_;
166 : std::unordered_map<select_socket_impl*, std::shared_ptr<select_socket_impl>> socket_ptrs_;
167 : };
168 :
169 : /** select socket service implementation.
170 :
171 : Inherits from socket_service to enable runtime polymorphism.
172 : Uses key_type = socket_service for service lookup.
173 : */
174 : class select_socket_service : public socket_service
175 : {
176 : public:
177 : explicit select_socket_service(capy::execution_context& ctx);
178 : ~select_socket_service();
179 :
180 : select_socket_service(select_socket_service const&) = delete;
181 : select_socket_service& operator=(select_socket_service const&) = delete;
182 :
183 : void shutdown() override;
184 :
185 : tcp_socket::socket_impl& create_impl() override;
186 : void destroy_impl(tcp_socket::socket_impl& impl) override;
187 : std::error_code open_socket(tcp_socket::socket_impl& impl) override;
188 :
189 6829 : select_scheduler& scheduler() const noexcept { return state_->sched_; }
190 : void post(select_op* op);
191 : void work_started() noexcept;
192 : void work_finished() noexcept;
193 :
194 : private:
195 : std::unique_ptr<select_socket_state> state_;
196 : };
197 :
198 : // Backward compatibility alias
199 : using select_sockets = select_socket_service;
200 :
201 : } // namespace boost::corosio::detail
202 :
203 : #endif // BOOST_COROSIO_HAS_SELECT
204 :
205 : #endif // BOOST_COROSIO_DETAIL_SELECT_SOCKETS_HPP
|