TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_OP_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_OP_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_SELECT
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/corosio/io/io_object.hpp>
19 : #include <boost/corosio/endpoint.hpp>
20 : #include <boost/capy/ex/executor_ref.hpp>
21 : #include <coroutine>
22 : #include <boost/capy/error.hpp>
23 : #include <system_error>
24 :
25 : #include <boost/corosio/native/detail/make_err.hpp>
26 : #include <boost/corosio/detail/dispatch_coro.hpp>
27 : #include <boost/corosio/detail/scheduler_op.hpp>
28 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
29 :
30 : #include <unistd.h>
31 : #include <errno.h>
32 : #include <fcntl.h>
33 :
34 : #include <atomic>
35 : #include <cstddef>
36 : #include <memory>
37 : #include <optional>
38 : #include <stop_token>
39 :
40 : #include <netinet/in.h>
41 : #include <sys/select.h>
42 : #include <sys/socket.h>
43 : #include <sys/uio.h>
44 :
45 : /*
46 : select Operation State
47 : ======================
48 :
49 : Each async I/O operation has a corresponding select_op-derived struct that
50 : holds the operation's state while it's in flight. The socket impl owns
51 : fixed slots for each operation type (conn_, rd_, wr_), so only one
52 : operation of each type can be pending per socket at a time.
53 :
54 : This mirrors the epoll_op design for consistency across backends.
55 :
56 : Completion vs Cancellation Race
57 : -------------------------------
58 : The `registered` atomic uses a tri-state (unregistered, registering,
59 : registered) to handle two races: (1) between register_fd() and the
60 : reactor seeing an event, and (2) between reactor completion and cancel().
61 :
62 : The registering state closes the window where an event could arrive
63 : after register_fd() but before the boolean was set. The reactor and
64 : cancel() both treat registering the same as registered when claiming.
65 :
66 : Whoever atomically exchanges to unregistered "claims" the operation
67 : and is responsible for completing it. The loser sees unregistered and
68 : does nothing. The initiating thread uses compare_exchange to transition
69 : from registering to registered; if this fails, the reactor or cancel
70 : already claimed the op.
71 :
72 : Impl Lifetime Management
73 : ------------------------
74 : When cancel() posts an op to the scheduler's ready queue, the socket impl
75 : might be destroyed before the scheduler processes the op. The `impl_ptr`
76 : member holds a shared_ptr to the impl, keeping it alive until the op
77 : completes.
78 :
79 : EOF Detection
80 : -------------
81 : For reads, 0 bytes with no error means EOF. But an empty user buffer also
82 : returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
83 :
84 : SIGPIPE Prevention
85 : ------------------
86 : Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
87 : SIGPIPE when the peer has closed.
88 : */
89 :
90 : namespace boost::corosio::detail {
91 :
92 : // Forward declarations for cancellation support
93 : class select_socket;
94 : class select_acceptor;
95 :
96 : /** Registration state for async operations.
97 :
98 : Tri-state enum to handle the race between register_fd() and
99 : run_reactor() seeing an event. Setting REGISTERING before
100 : calling register_fd() ensures events delivered during the
101 : registration window are not dropped.
102 : */
103 : enum class select_registration_state : std::uint8_t
104 : {
105 : unregistered, ///< Not registered with reactor
106 : registering, ///< register_fd() called, not yet confirmed
107 : registered ///< Fully registered, ready for events
108 : };
109 :
110 : struct select_op : scheduler_op
111 : {
112 : struct canceller
113 : {
114 : select_op* op;
115 : void operator()() const noexcept;
116 : };
117 :
118 : std::coroutine_handle<> h;
119 : capy::executor_ref ex;
120 : std::error_code* ec_out = nullptr;
121 : std::size_t* bytes_out = nullptr;
122 :
123 : int fd = -1;
124 : int errn = 0;
125 : std::size_t bytes_transferred = 0;
126 :
127 : std::atomic<bool> cancelled{false};
128 : std::atomic<select_registration_state> registered{
129 : select_registration_state::unregistered};
130 : std::optional<std::stop_callback<canceller>> stop_cb;
131 :
132 : // Prevents use-after-free when socket is closed with pending ops.
133 : std::shared_ptr<void> impl_ptr;
134 :
135 : // For stop_token cancellation - pointer to owning socket/acceptor impl.
136 : select_socket* socket_impl_ = nullptr;
137 : select_acceptor* acceptor_impl_ = nullptr;
138 :
139 HIT 31594 : select_op() = default;
140 :
141 162204 : void reset() noexcept
142 : {
143 162204 : fd = -1;
144 162204 : errn = 0;
145 162204 : bytes_transferred = 0;
146 162204 : cancelled.store(false, std::memory_order_relaxed);
147 162204 : registered.store(
148 : select_registration_state::unregistered, std::memory_order_relaxed);
149 162204 : impl_ptr.reset();
150 162204 : socket_impl_ = nullptr;
151 162204 : acceptor_impl_ = nullptr;
152 162204 : }
153 :
154 155209 : void operator()() override
155 : {
156 155209 : stop_cb.reset();
157 :
158 155209 : if (ec_out)
159 : {
160 155209 : if (cancelled.load(std::memory_order_acquire))
161 197 : *ec_out = capy::error::canceled;
162 155012 : else if (errn != 0)
163 1 : *ec_out = make_err(errn);
164 155011 : else if (is_read_operation() && bytes_transferred == 0)
165 5 : *ec_out = capy::error::eof;
166 : else
167 155006 : *ec_out = {};
168 : }
169 :
170 155209 : if (bytes_out)
171 155209 : *bytes_out = bytes_transferred;
172 :
173 : // Move to stack before destroying the frame
174 155209 : capy::executor_ref saved_ex(ex);
175 155209 : std::coroutine_handle<> saved_h(h);
176 155209 : impl_ptr.reset();
177 155209 : dispatch_coro(saved_ex, saved_h).resume();
178 155209 : }
179 :
180 77519 : virtual bool is_read_operation() const noexcept
181 : {
182 77519 : return false;
183 : }
184 : virtual void cancel() noexcept = 0;
185 :
186 MIS 0 : void destroy() override
187 : {
188 0 : stop_cb.reset();
189 0 : impl_ptr.reset();
190 0 : }
191 :
192 HIT 95482 : void request_cancel() noexcept
193 : {
194 95482 : cancelled.store(true, std::memory_order_release);
195 95482 : }
196 :
197 : void start(std::stop_token const& token)
198 : {
199 : cancelled.store(false, std::memory_order_release);
200 : stop_cb.reset();
201 : socket_impl_ = nullptr;
202 : acceptor_impl_ = nullptr;
203 :
204 : if (token.stop_possible())
205 : stop_cb.emplace(token, canceller{this});
206 : }
207 :
208 158706 : void start(std::stop_token const& token, select_socket* impl)
209 : {
210 158706 : cancelled.store(false, std::memory_order_release);
211 158706 : stop_cb.reset();
212 158706 : socket_impl_ = impl;
213 158706 : acceptor_impl_ = nullptr;
214 :
215 158706 : if (token.stop_possible())
216 99 : stop_cb.emplace(token, canceller{this});
217 158706 : }
218 :
219 3498 : void start(std::stop_token const& token, select_acceptor* impl)
220 : {
221 3498 : cancelled.store(false, std::memory_order_release);
222 3498 : stop_cb.reset();
223 3498 : socket_impl_ = nullptr;
224 3498 : acceptor_impl_ = impl;
225 :
226 3498 : if (token.stop_possible())
227 MIS 0 : stop_cb.emplace(token, canceller{this});
228 HIT 3498 : }
229 :
230 162044 : void complete(int err, std::size_t bytes) noexcept
231 : {
232 162044 : errn = err;
233 162044 : bytes_transferred = bytes;
234 162044 : }
235 :
236 MIS 0 : virtual void perform_io() noexcept {}
237 : };
238 :
239 : struct select_connect_op final : select_op
240 : {
241 : endpoint target_endpoint;
242 :
243 HIT 3497 : void reset() noexcept
244 : {
245 3497 : select_op::reset();
246 3497 : target_endpoint = endpoint{};
247 3497 : }
248 :
249 3497 : void perform_io() noexcept override
250 : {
251 : // connect() completion status is retrieved via SO_ERROR, not return value
252 3497 : int err = 0;
253 3497 : socklen_t len = sizeof(err);
254 3497 : if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
255 MIS 0 : err = errno;
256 HIT 3497 : complete(err, 0);
257 3497 : }
258 :
259 : // Defined in sockets.cpp where select_socket is complete
260 : void operator()() override;
261 : void cancel() noexcept override;
262 : };
263 :
264 : struct select_read_op final : select_op
265 : {
266 : static constexpr std::size_t max_buffers = 16;
267 : iovec iovecs[max_buffers];
268 : int iovec_count = 0;
269 : bool empty_buffer_read = false;
270 :
271 77492 : bool is_read_operation() const noexcept override
272 : {
273 77492 : return !empty_buffer_read;
274 : }
275 :
276 77685 : void reset() noexcept
277 : {
278 77685 : select_op::reset();
279 77685 : iovec_count = 0;
280 77685 : empty_buffer_read = false;
281 77685 : }
282 :
283 121 : void perform_io() noexcept override
284 : {
285 121 : ssize_t n = ::readv(fd, iovecs, iovec_count);
286 121 : if (n >= 0)
287 121 : complete(0, static_cast<std::size_t>(n));
288 : else
289 MIS 0 : complete(errno, 0);
290 HIT 121 : }
291 :
292 : void cancel() noexcept override;
293 : };
294 :
295 : struct select_write_op final : select_op
296 : {
297 : static constexpr std::size_t max_buffers = 16;
298 : iovec iovecs[max_buffers];
299 : int iovec_count = 0;
300 :
301 77524 : void reset() noexcept
302 : {
303 77524 : select_op::reset();
304 77524 : iovec_count = 0;
305 77524 : }
306 :
307 MIS 0 : void perform_io() noexcept override
308 : {
309 0 : msghdr msg{};
310 0 : msg.msg_iov = iovecs;
311 0 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
312 :
313 0 : ssize_t n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
314 0 : if (n >= 0)
315 0 : complete(0, static_cast<std::size_t>(n));
316 : else
317 0 : complete(errno, 0);
318 0 : }
319 :
320 : void cancel() noexcept override;
321 : };
322 :
323 : struct select_accept_op final : select_op
324 : {
325 : int accepted_fd = -1;
326 : io_object::implementation* peer_impl = nullptr;
327 : io_object::implementation** impl_out = nullptr;
328 :
329 HIT 3498 : void reset() noexcept
330 : {
331 3498 : select_op::reset();
332 3498 : accepted_fd = -1;
333 3498 : peer_impl = nullptr;
334 3498 : impl_out = nullptr;
335 3498 : }
336 :
337 3493 : void perform_io() noexcept override
338 : {
339 3493 : sockaddr_storage addr_storage{};
340 3493 : socklen_t addrlen = sizeof(addr_storage);
341 :
342 : // Note: select backend uses accept() + fcntl instead of accept4()
343 : // for broader POSIX compatibility
344 : int new_fd =
345 3493 : ::accept(fd, reinterpret_cast<sockaddr*>(&addr_storage), &addrlen);
346 :
347 3493 : if (new_fd >= 0)
348 : {
349 : // Reject fds that exceed select()'s FD_SETSIZE limit.
350 : // Better to fail now than during later async operations.
351 3493 : if (new_fd >= FD_SETSIZE)
352 : {
353 MIS 0 : ::close(new_fd);
354 0 : complete(EINVAL, 0);
355 0 : return;
356 : }
357 :
358 : // Set non-blocking and close-on-exec flags.
359 : // A non-blocking socket is essential for the async reactor;
360 : // if we can't configure it, fail rather than risk blocking.
361 HIT 3493 : int flags = ::fcntl(new_fd, F_GETFL, 0);
362 3493 : if (flags == -1)
363 : {
364 MIS 0 : int err = errno;
365 0 : ::close(new_fd);
366 0 : complete(err, 0);
367 0 : return;
368 : }
369 :
370 HIT 3493 : if (::fcntl(new_fd, F_SETFL, flags | O_NONBLOCK) == -1)
371 : {
372 MIS 0 : int err = errno;
373 0 : ::close(new_fd);
374 0 : complete(err, 0);
375 0 : return;
376 : }
377 :
378 HIT 3493 : if (::fcntl(new_fd, F_SETFD, FD_CLOEXEC) == -1)
379 : {
380 MIS 0 : int err = errno;
381 0 : ::close(new_fd);
382 0 : complete(err, 0);
383 0 : return;
384 : }
385 :
386 HIT 3493 : accepted_fd = new_fd;
387 3493 : complete(0, 0);
388 : }
389 : else
390 : {
391 MIS 0 : complete(errno, 0);
392 : }
393 : }
394 :
395 : // Defined in acceptors.cpp where select_acceptor is complete
396 : void operator()() override;
397 : void cancel() noexcept override;
398 : };
399 :
400 : } // namespace boost::corosio::detail
401 :
402 : #endif // BOOST_COROSIO_HAS_SELECT
403 :
404 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_OP_HPP
|