TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_SELECT
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 : #include <boost/corosio/detail/socket_service.hpp>
20 :
21 : #include <boost/corosio/native/detail/select/select_socket.hpp>
22 : #include <boost/corosio/native/detail/select/select_scheduler.hpp>
23 :
24 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
25 : #include <boost/corosio/detail/dispatch_coro.hpp>
26 : #include <boost/corosio/native/detail/make_err.hpp>
27 :
28 : #include <boost/corosio/detail/except.hpp>
29 :
30 : #include <boost/capy/buffers.hpp>
31 :
32 : #include <errno.h>
33 : #include <fcntl.h>
34 : #include <netinet/in.h>
35 : #include <netinet/tcp.h>
36 : #include <sys/socket.h>
37 : #include <unistd.h>
38 :
39 : #include <memory>
40 : #include <mutex>
41 : #include <unordered_map>
42 :
43 : /*
44 : select Socket Implementation
45 : ============================
46 :
47 : This mirrors the epoll_sockets design for behavioral consistency.
48 : Each I/O operation follows the same pattern:
49 : 1. Try the syscall immediately (non-blocking socket)
50 : 2. If it succeeds or fails with a real error, post to completion queue
51 : 3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
52 :
53 : Cancellation
54 : ------------
55 : See op.hpp for the completion/cancellation race handling via the
56 : `registered` atomic. cancel() must complete pending operations (post
57 : them with cancelled flag) so coroutines waiting on them can resume.
58 : close_socket() calls cancel() first to ensure this.
59 :
60 : Impl Lifetime with shared_ptr
61 : -----------------------------
62 : Socket impls use enable_shared_from_this. The service owns impls via
63 : shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
64 : removal. When a user calls close(), we call cancel() which posts pending
65 : ops to the scheduler.
66 :
67 : CRITICAL: The posted ops must keep the impl alive until they complete.
68 : Otherwise the scheduler would process a freed op (use-after-free). The
69 : cancel() method captures shared_from_this() into op.impl_ptr before
70 : posting. When the op completes, impl_ptr is cleared, allowing the impl
71 : to be destroyed if no other references exist.
72 :
73 : Service Ownership
74 : -----------------
75 : select_socket_service owns all socket impls. destroy() removes the
76 : shared_ptr from the map, but the impl may survive if ops still hold
77 : impl_ptr refs. shutdown() closes all sockets and clears the map; any
78 : in-flight ops will complete and release their refs.
79 : */
80 :
81 : namespace boost::corosio::detail {
82 :
83 : /** State for select socket service. */
84 : class select_socket_state
85 : {
86 : public:
87 HIT 168 : explicit select_socket_state(select_scheduler& sched) noexcept
88 168 : : sched_(sched)
89 : {
90 168 : }
91 :
92 : select_scheduler& sched_;
93 : std::mutex mutex_;
94 : intrusive_list<select_socket> socket_list_;
95 : std::unordered_map<select_socket*, std::shared_ptr<select_socket>>
96 : socket_ptrs_;
97 : };
98 :
99 : /** select socket service implementation.
100 :
101 : Inherits from socket_service to enable runtime polymorphism.
102 : Uses key_type = socket_service for service lookup.
103 : */
104 : class BOOST_COROSIO_DECL select_socket_service final : public socket_service
105 : {
106 : public:
107 : explicit select_socket_service(capy::execution_context& ctx);
108 : ~select_socket_service() override;
109 :
110 : select_socket_service(select_socket_service const&) = delete;
111 : select_socket_service& operator=(select_socket_service const&) = delete;
112 :
113 : void shutdown() override;
114 :
115 : io_object::implementation* construct() override;
116 : void destroy(io_object::implementation*) override;
117 : void close(io_object::handle&) override;
118 : std::error_code open_socket(
119 : tcp_socket::implementation& impl,
120 : int family,
121 : int type,
122 : int protocol) override;
123 :
124 10939 : select_scheduler& scheduler() const noexcept
125 : {
126 10939 : return state_->sched_;
127 : }
128 : void post(select_op* op);
129 : void work_started() noexcept;
130 : void work_finished() noexcept;
131 :
132 : private:
133 : std::unique_ptr<select_socket_state> state_;
134 : };
135 :
136 : // Backward compatibility alias
137 : using select_sockets = select_socket_service;
138 :
139 : inline void
140 98 : select_op::canceller::operator()() const noexcept
141 : {
142 98 : op->cancel();
143 98 : }
144 :
145 : inline void
146 MIS 0 : select_connect_op::cancel() noexcept
147 : {
148 0 : if (socket_impl_)
149 0 : socket_impl_->cancel_single_op(*this);
150 : else
151 0 : request_cancel();
152 0 : }
153 :
154 : inline void
155 HIT 98 : select_read_op::cancel() noexcept
156 : {
157 98 : if (socket_impl_)
158 98 : socket_impl_->cancel_single_op(*this);
159 : else
160 MIS 0 : request_cancel();
161 HIT 98 : }
162 :
163 : inline void
164 MIS 0 : select_write_op::cancel() noexcept
165 : {
166 0 : if (socket_impl_)
167 0 : socket_impl_->cancel_single_op(*this);
168 : else
169 0 : request_cancel();
170 0 : }
171 :
172 : inline void
173 HIT 3497 : select_connect_op::operator()()
174 : {
175 3497 : stop_cb.reset();
176 :
177 3497 : bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
178 :
179 : // Cache endpoints on successful connect
180 3497 : if (success && socket_impl_)
181 : {
182 3495 : endpoint local_ep;
183 3495 : sockaddr_storage local_storage{};
184 3495 : socklen_t local_len = sizeof(local_storage);
185 3495 : if (::getsockname(
186 3495 : fd, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
187 : 0)
188 3495 : local_ep = from_sockaddr(local_storage);
189 3495 : static_cast<select_socket*>(socket_impl_)
190 3495 : ->set_endpoints(local_ep, target_endpoint);
191 : }
192 :
193 3497 : if (ec_out)
194 : {
195 3497 : if (cancelled.load(std::memory_order_acquire))
196 MIS 0 : *ec_out = capy::error::canceled;
197 HIT 3497 : else if (errn != 0)
198 2 : *ec_out = make_err(errn);
199 : else
200 3495 : *ec_out = {};
201 : }
202 :
203 3497 : if (bytes_out)
204 MIS 0 : *bytes_out = bytes_transferred;
205 :
206 : // Move to stack before destroying the frame
207 HIT 3497 : capy::executor_ref saved_ex(ex);
208 3497 : std::coroutine_handle<> saved_h(h);
209 3497 : impl_ptr.reset();
210 3497 : dispatch_coro(saved_ex, saved_h).resume();
211 3497 : }
212 :
213 10511 : inline select_socket::select_socket(select_socket_service& svc) noexcept
214 10511 : : svc_(svc)
215 : {
216 10511 : }
217 :
218 : inline std::coroutine_handle<>
219 3497 : select_socket::connect(
220 : std::coroutine_handle<> h,
221 : capy::executor_ref ex,
222 : endpoint ep,
223 : std::stop_token token,
224 : std::error_code* ec)
225 : {
226 3497 : auto& op = conn_;
227 3497 : op.reset();
228 3497 : op.h = h;
229 3497 : op.ex = ex;
230 3497 : op.ec_out = ec;
231 3497 : op.fd = fd_;
232 3497 : op.target_endpoint = ep; // Store target for endpoint caching
233 3497 : op.start(token, this);
234 :
235 3497 : sockaddr_storage storage{};
236 : socklen_t addrlen =
237 3497 : detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
238 3497 : int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
239 :
240 3497 : if (result == 0)
241 : {
242 : // Sync success — cache endpoints immediately
243 MIS 0 : sockaddr_storage local_storage{};
244 0 : socklen_t local_len = sizeof(local_storage);
245 0 : if (::getsockname(
246 0 : fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
247 : 0)
248 0 : local_endpoint_ = detail::from_sockaddr(local_storage);
249 0 : remote_endpoint_ = ep;
250 :
251 0 : op.complete(0, 0);
252 0 : op.impl_ptr = shared_from_this();
253 0 : svc_.post(&op);
254 : // completion is always posted to scheduler queue, never inline.
255 0 : return std::noop_coroutine();
256 : }
257 :
258 HIT 3497 : if (errno == EINPROGRESS)
259 : {
260 3497 : svc_.work_started();
261 3497 : op.impl_ptr = shared_from_this();
262 :
263 : // Set registering BEFORE register_fd to close the race window where
264 : // reactor sees an event before we set registered. The reactor treats
265 : // registering the same as registered when claiming the op.
266 3497 : op.registered.store(
267 : select_registration_state::registering, std::memory_order_release);
268 3497 : svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
269 :
270 : // Transition to registered. If this fails, reactor or cancel already
271 : // claimed the op (state is now unregistered), so we're done. However,
272 : // we must still deregister the fd because cancel's deregister_fd may
273 : // have run before our register_fd, leaving the fd orphaned.
274 3497 : auto expected = select_registration_state::registering;
275 3497 : if (!op.registered.compare_exchange_strong(
276 : expected, select_registration_state::registered,
277 : std::memory_order_acq_rel))
278 : {
279 MIS 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
280 : // completion is always posted to scheduler queue, never inline.
281 0 : return std::noop_coroutine();
282 : }
283 :
284 : // If cancelled was set before we registered, handle it now.
285 HIT 3497 : if (op.cancelled.load(std::memory_order_acquire))
286 : {
287 MIS 0 : auto prev = op.registered.exchange(
288 : select_registration_state::unregistered,
289 : std::memory_order_acq_rel);
290 0 : if (prev != select_registration_state::unregistered)
291 : {
292 0 : svc_.scheduler().deregister_fd(
293 : fd_, select_scheduler::event_write);
294 0 : op.impl_ptr = shared_from_this();
295 0 : svc_.post(&op);
296 0 : svc_.work_finished();
297 : }
298 : }
299 : // completion is always posted to scheduler queue, never inline.
300 HIT 3497 : return std::noop_coroutine();
301 : }
302 :
303 MIS 0 : op.complete(errno, 0);
304 0 : op.impl_ptr = shared_from_this();
305 0 : svc_.post(&op);
306 : // completion is always posted to scheduler queue, never inline.
307 0 : return std::noop_coroutine();
308 : }
309 :
310 : inline std::coroutine_handle<>
311 HIT 77685 : select_socket::read_some(
312 : std::coroutine_handle<> h,
313 : capy::executor_ref ex,
314 : buffer_param param,
315 : std::stop_token token,
316 : std::error_code* ec,
317 : std::size_t* bytes_out)
318 : {
319 77685 : auto& op = rd_;
320 77685 : op.reset();
321 77685 : op.h = h;
322 77685 : op.ex = ex;
323 77685 : op.ec_out = ec;
324 77685 : op.bytes_out = bytes_out;
325 77685 : op.fd = fd_;
326 77685 : op.start(token, this);
327 :
328 77685 : capy::mutable_buffer bufs[select_read_op::max_buffers];
329 77685 : op.iovec_count =
330 77685 : static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
331 :
332 77685 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
333 : {
334 1 : op.empty_buffer_read = true;
335 1 : op.complete(0, 0);
336 1 : op.impl_ptr = shared_from_this();
337 1 : svc_.post(&op);
338 1 : return std::noop_coroutine();
339 : }
340 :
341 155368 : for (int i = 0; i < op.iovec_count; ++i)
342 : {
343 77684 : op.iovecs[i].iov_base = bufs[i].data();
344 77684 : op.iovecs[i].iov_len = bufs[i].size();
345 : }
346 :
347 77684 : ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
348 :
349 77684 : if (n > 0)
350 : {
351 77401 : op.complete(0, static_cast<std::size_t>(n));
352 77401 : op.impl_ptr = shared_from_this();
353 77401 : svc_.post(&op);
354 77401 : return std::noop_coroutine();
355 : }
356 :
357 283 : if (n == 0)
358 : {
359 5 : op.complete(0, 0);
360 5 : op.impl_ptr = shared_from_this();
361 5 : svc_.post(&op);
362 5 : return std::noop_coroutine();
363 : }
364 :
365 278 : if (errno == EAGAIN || errno == EWOULDBLOCK)
366 : {
367 278 : svc_.work_started();
368 278 : op.impl_ptr = shared_from_this();
369 :
370 : // Set registering BEFORE register_fd to close the race window where
371 : // reactor sees an event before we set registered.
372 278 : op.registered.store(
373 : select_registration_state::registering, std::memory_order_release);
374 278 : svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
375 :
376 : // Transition to registered. If this fails, reactor or cancel already
377 : // claimed the op (state is now unregistered), so we're done. However,
378 : // we must still deregister the fd because cancel's deregister_fd may
379 : // have run before our register_fd, leaving the fd orphaned.
380 278 : auto expected = select_registration_state::registering;
381 278 : if (!op.registered.compare_exchange_strong(
382 : expected, select_registration_state::registered,
383 : std::memory_order_acq_rel))
384 : {
385 MIS 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
386 0 : return std::noop_coroutine();
387 : }
388 :
389 : // If cancelled was set before we registered, handle it now.
390 HIT 278 : if (op.cancelled.load(std::memory_order_acquire))
391 : {
392 MIS 0 : auto prev = op.registered.exchange(
393 : select_registration_state::unregistered,
394 : std::memory_order_acq_rel);
395 0 : if (prev != select_registration_state::unregistered)
396 : {
397 0 : svc_.scheduler().deregister_fd(
398 : fd_, select_scheduler::event_read);
399 0 : op.impl_ptr = shared_from_this();
400 0 : svc_.post(&op);
401 0 : svc_.work_finished();
402 : }
403 : }
404 HIT 278 : return std::noop_coroutine();
405 : }
406 :
407 MIS 0 : op.complete(errno, 0);
408 0 : op.impl_ptr = shared_from_this();
409 0 : svc_.post(&op);
410 0 : return std::noop_coroutine();
411 : }
412 :
413 : inline std::coroutine_handle<>
414 HIT 77524 : select_socket::write_some(
415 : std::coroutine_handle<> h,
416 : capy::executor_ref ex,
417 : buffer_param param,
418 : std::stop_token token,
419 : std::error_code* ec,
420 : std::size_t* bytes_out)
421 : {
422 77524 : auto& op = wr_;
423 77524 : op.reset();
424 77524 : op.h = h;
425 77524 : op.ex = ex;
426 77524 : op.ec_out = ec;
427 77524 : op.bytes_out = bytes_out;
428 77524 : op.fd = fd_;
429 77524 : op.start(token, this);
430 :
431 77524 : capy::mutable_buffer bufs[select_write_op::max_buffers];
432 77524 : op.iovec_count =
433 77524 : static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
434 :
435 77524 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
436 : {
437 1 : op.complete(0, 0);
438 1 : op.impl_ptr = shared_from_this();
439 1 : svc_.post(&op);
440 1 : return std::noop_coroutine();
441 : }
442 :
443 155046 : for (int i = 0; i < op.iovec_count; ++i)
444 : {
445 77523 : op.iovecs[i].iov_base = bufs[i].data();
446 77523 : op.iovecs[i].iov_len = bufs[i].size();
447 : }
448 :
449 77523 : msghdr msg{};
450 77523 : msg.msg_iov = op.iovecs;
451 77523 : msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
452 :
453 77523 : ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
454 :
455 77523 : if (n > 0)
456 : {
457 77522 : op.complete(0, static_cast<std::size_t>(n));
458 77522 : op.impl_ptr = shared_from_this();
459 77522 : svc_.post(&op);
460 77522 : return std::noop_coroutine();
461 : }
462 :
463 1 : if (errno == EAGAIN || errno == EWOULDBLOCK)
464 : {
465 MIS 0 : svc_.work_started();
466 0 : op.impl_ptr = shared_from_this();
467 :
468 : // Set registering BEFORE register_fd to close the race window where
469 : // reactor sees an event before we set registered.
470 0 : op.registered.store(
471 : select_registration_state::registering, std::memory_order_release);
472 0 : svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
473 :
474 : // Transition to registered. If this fails, reactor or cancel already
475 : // claimed the op (state is now unregistered), so we're done. However,
476 : // we must still deregister the fd because cancel's deregister_fd may
477 : // have run before our register_fd, leaving the fd orphaned.
478 0 : auto expected = select_registration_state::registering;
479 0 : if (!op.registered.compare_exchange_strong(
480 : expected, select_registration_state::registered,
481 : std::memory_order_acq_rel))
482 : {
483 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
484 0 : return std::noop_coroutine();
485 : }
486 :
487 : // If cancelled was set before we registered, handle it now.
488 0 : if (op.cancelled.load(std::memory_order_acquire))
489 : {
490 0 : auto prev = op.registered.exchange(
491 : select_registration_state::unregistered,
492 : std::memory_order_acq_rel);
493 0 : if (prev != select_registration_state::unregistered)
494 : {
495 0 : svc_.scheduler().deregister_fd(
496 : fd_, select_scheduler::event_write);
497 0 : op.impl_ptr = shared_from_this();
498 0 : svc_.post(&op);
499 0 : svc_.work_finished();
500 : }
501 : }
502 0 : return std::noop_coroutine();
503 : }
504 :
505 HIT 1 : op.complete(errno ? errno : EIO, 0);
506 1 : op.impl_ptr = shared_from_this();
507 1 : svc_.post(&op);
508 1 : return std::noop_coroutine();
509 : }
510 :
511 : inline std::error_code
512 3 : select_socket::shutdown(tcp_socket::shutdown_type what) noexcept
513 : {
514 : int how;
515 3 : switch (what)
516 : {
517 1 : case tcp_socket::shutdown_receive:
518 1 : how = SHUT_RD;
519 1 : break;
520 1 : case tcp_socket::shutdown_send:
521 1 : how = SHUT_WR;
522 1 : break;
523 1 : case tcp_socket::shutdown_both:
524 1 : how = SHUT_RDWR;
525 1 : break;
526 MIS 0 : default:
527 0 : return make_err(EINVAL);
528 : }
529 HIT 3 : if (::shutdown(fd_, how) != 0)
530 MIS 0 : return make_err(errno);
531 HIT 3 : return {};
532 : }
533 :
534 : inline std::error_code
535 28 : select_socket::set_option(
536 : int level, int optname, void const* data, std::size_t size) noexcept
537 : {
538 28 : if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) !=
539 : 0)
540 MIS 0 : return make_err(errno);
541 HIT 28 : return {};
542 : }
543 :
544 : inline std::error_code
545 31 : select_socket::get_option(
546 : int level, int optname, void* data, std::size_t* size) const noexcept
547 : {
548 31 : socklen_t len = static_cast<socklen_t>(*size);
549 31 : if (::getsockopt(fd_, level, optname, data, &len) != 0)
550 MIS 0 : return make_err(errno);
551 HIT 31 : *size = static_cast<std::size_t>(len);
552 31 : return {};
553 : }
554 :
555 : inline void
556 173 : select_socket::cancel() noexcept
557 : {
558 173 : auto self = weak_from_this().lock();
559 173 : if (!self)
560 MIS 0 : return;
561 :
562 HIT 519 : auto cancel_op = [this, &self](select_op& op, int events) {
563 519 : auto prev = op.registered.exchange(
564 : select_registration_state::unregistered, std::memory_order_acq_rel);
565 519 : op.request_cancel();
566 519 : if (prev != select_registration_state::unregistered)
567 : {
568 90 : svc_.scheduler().deregister_fd(fd_, events);
569 90 : op.impl_ptr = self;
570 90 : svc_.post(&op);
571 90 : svc_.work_finished();
572 : }
573 692 : };
574 :
575 173 : cancel_op(conn_, select_scheduler::event_write);
576 173 : cancel_op(rd_, select_scheduler::event_read);
577 173 : cancel_op(wr_, select_scheduler::event_write);
578 173 : }
579 :
580 : inline void
581 98 : select_socket::cancel_single_op(select_op& op) noexcept
582 : {
583 98 : auto self = weak_from_this().lock();
584 98 : if (!self)
585 MIS 0 : return;
586 :
587 : // Called from stop_token callback to cancel a specific pending operation.
588 HIT 98 : auto prev = op.registered.exchange(
589 : select_registration_state::unregistered, std::memory_order_acq_rel);
590 98 : op.request_cancel();
591 :
592 98 : if (prev != select_registration_state::unregistered)
593 : {
594 : // Determine which event type to deregister
595 66 : int events = 0;
596 66 : if (&op == &conn_ || &op == &wr_)
597 MIS 0 : events = select_scheduler::event_write;
598 HIT 66 : else if (&op == &rd_)
599 66 : events = select_scheduler::event_read;
600 :
601 66 : svc_.scheduler().deregister_fd(fd_, events);
602 :
603 66 : op.impl_ptr = self;
604 66 : svc_.post(&op);
605 66 : svc_.work_finished();
606 : }
607 98 : }
608 :
609 : inline void
610 31541 : select_socket::close_socket() noexcept
611 : {
612 31541 : auto self = weak_from_this().lock();
613 31541 : if (self)
614 : {
615 94623 : auto cancel_op = [this, &self](select_op& op, int events) {
616 94623 : auto prev = op.registered.exchange(
617 : select_registration_state::unregistered,
618 : std::memory_order_acq_rel);
619 94623 : op.request_cancel();
620 94623 : if (prev != select_registration_state::unregistered)
621 : {
622 1 : svc_.scheduler().deregister_fd(fd_, events);
623 1 : op.impl_ptr = self;
624 1 : svc_.post(&op);
625 1 : svc_.work_finished();
626 : }
627 126164 : };
628 :
629 31541 : cancel_op(conn_, select_scheduler::event_write);
630 31541 : cancel_op(rd_, select_scheduler::event_read);
631 31541 : cancel_op(wr_, select_scheduler::event_write);
632 : }
633 :
634 31541 : if (fd_ >= 0)
635 : {
636 7007 : svc_.scheduler().deregister_fd(
637 : fd_, select_scheduler::event_read | select_scheduler::event_write);
638 7007 : ::close(fd_);
639 7007 : fd_ = -1;
640 : }
641 :
642 31541 : local_endpoint_ = endpoint{};
643 31541 : remote_endpoint_ = endpoint{};
644 31541 : }
645 :
646 168 : inline select_socket_service::select_socket_service(
647 168 : capy::execution_context& ctx)
648 168 : : state_(
649 : std::make_unique<select_socket_state>(
650 168 : ctx.use_service<select_scheduler>()))
651 : {
652 168 : }
653 :
654 336 : inline select_socket_service::~select_socket_service() {}
655 :
656 : inline void
657 168 : select_socket_service::shutdown()
658 : {
659 168 : std::lock_guard lock(state_->mutex_);
660 :
661 168 : while (auto* impl = state_->socket_list_.pop_front())
662 MIS 0 : impl->close_socket();
663 :
664 : // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
665 : // drains completed_ops_, calling destroy() on each queued op. Letting
666 : // ~state_ release the ptrs (during service destruction, after scheduler
667 : // shutdown) keeps every impl alive until all ops have been drained.
668 HIT 168 : }
669 :
670 : inline io_object::implementation*
671 10511 : select_socket_service::construct()
672 : {
673 10511 : auto impl = std::make_shared<select_socket>(*this);
674 10511 : auto* raw = impl.get();
675 :
676 : {
677 10511 : std::lock_guard lock(state_->mutex_);
678 10511 : state_->socket_list_.push_back(raw);
679 10511 : state_->socket_ptrs_.emplace(raw, std::move(impl));
680 10511 : }
681 :
682 10511 : return raw;
683 10511 : }
684 :
685 : inline void
686 10511 : select_socket_service::destroy(io_object::implementation* impl)
687 : {
688 10511 : auto* select_impl = static_cast<select_socket*>(impl);
689 10511 : select_impl->close_socket();
690 10511 : std::lock_guard lock(state_->mutex_);
691 10511 : state_->socket_list_.remove(select_impl);
692 10511 : state_->socket_ptrs_.erase(select_impl);
693 10511 : }
694 :
695 : inline std::error_code
696 3512 : select_socket_service::open_socket(
697 : tcp_socket::implementation& impl, int family, int type, int protocol)
698 : {
699 3512 : auto* select_impl = static_cast<select_socket*>(&impl);
700 3512 : select_impl->close_socket();
701 :
702 3512 : int fd = ::socket(family, type, protocol);
703 3512 : if (fd < 0)
704 MIS 0 : return make_err(errno);
705 :
706 HIT 3512 : if (family == AF_INET6)
707 : {
708 5 : int one = 1;
709 5 : ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
710 : }
711 :
712 : // Set non-blocking and close-on-exec
713 3512 : int flags = ::fcntl(fd, F_GETFL, 0);
714 3512 : if (flags == -1)
715 : {
716 MIS 0 : int errn = errno;
717 0 : ::close(fd);
718 0 : return make_err(errn);
719 : }
720 HIT 3512 : if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
721 : {
722 MIS 0 : int errn = errno;
723 0 : ::close(fd);
724 0 : return make_err(errn);
725 : }
726 HIT 3512 : if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
727 : {
728 MIS 0 : int errn = errno;
729 0 : ::close(fd);
730 0 : return make_err(errn);
731 : }
732 :
733 : // Check fd is within select() limits
734 HIT 3512 : if (fd >= FD_SETSIZE)
735 : {
736 MIS 0 : ::close(fd);
737 0 : return make_err(EMFILE); // Too many open files
738 : }
739 :
740 HIT 3512 : select_impl->fd_ = fd;
741 3512 : return {};
742 : }
743 :
744 : inline void
745 17518 : select_socket_service::close(io_object::handle& h)
746 : {
747 17518 : static_cast<select_socket*>(h.get())->close_socket();
748 17518 : }
749 :
750 : inline void
751 155088 : select_socket_service::post(select_op* op)
752 : {
753 155088 : state_->sched_.post(op);
754 155088 : }
755 :
756 : inline void
757 3775 : select_socket_service::work_started() noexcept
758 : {
759 3775 : state_->sched_.work_started();
760 3775 : }
761 :
762 : inline void
763 157 : select_socket_service::work_finished() noexcept
764 : {
765 157 : state_->sched_.work_finished();
766 157 : }
767 :
768 : } // namespace boost::corosio::detail
769 :
770 : #endif // BOOST_COROSIO_HAS_SELECT
771 :
772 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
|