include/boost/corosio/native/detail/select/select_socket_service.hpp

75.7% Lines (262/346) 93.1% Functions (27/29)
include/boost/corosio/native/detail/select/select_socket_service.hpp
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/socket_service.hpp>
20
21 #include <boost/corosio/native/detail/select/select_socket.hpp>
22 #include <boost/corosio/native/detail/select/select_scheduler.hpp>
23
24 #include <boost/corosio/native/detail/endpoint_convert.hpp>
25 #include <boost/corosio/detail/dispatch_coro.hpp>
26 #include <boost/corosio/native/detail/make_err.hpp>
27
28 #include <boost/corosio/detail/except.hpp>
29
30 #include <boost/capy/buffers.hpp>
31
32 #include <errno.h>
33 #include <fcntl.h>
34 #include <netinet/in.h>
35 #include <netinet/tcp.h>
36 #include <sys/socket.h>
37 #include <unistd.h>
38
39 #include <memory>
40 #include <mutex>
41 #include <unordered_map>
42
43 /*
44 select Socket Implementation
45 ============================
46
47 This mirrors the epoll_sockets design for behavioral consistency.
48 Each I/O operation follows the same pattern:
49 1. Try the syscall immediately (non-blocking socket)
50 2. If it succeeds or fails with a real error, post to completion queue
51 3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
52
53 Cancellation
54 ------------
55 See op.hpp for the completion/cancellation race handling via the
56 `registered` atomic. cancel() must complete pending operations (post
57 them with cancelled flag) so coroutines waiting on them can resume.
58 close_socket() calls cancel() first to ensure this.
59
60 Impl Lifetime with shared_ptr
61 -----------------------------
62 Socket impls use enable_shared_from_this. The service owns impls via
63 shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
64 removal. When a user calls close(), we call cancel() which posts pending
65 ops to the scheduler.
66
67 CRITICAL: The posted ops must keep the impl alive until they complete.
68 Otherwise the scheduler would process a freed op (use-after-free). The
69 cancel() method captures shared_from_this() into op.impl_ptr before
70 posting. When the op completes, impl_ptr is cleared, allowing the impl
71 to be destroyed if no other references exist.
72
73 Service Ownership
74 -----------------
75 select_socket_service owns all socket impls. destroy() removes the
76 shared_ptr from the map, but the impl may survive if ops still hold
77 impl_ptr refs. shutdown() closes all sockets and clears the map; any
78 in-flight ops will complete and release their refs.
79 */
80
81 namespace boost::corosio::detail {
82
83 /** State for select socket service. */
84 class select_socket_state
85 {
86 public:
87 168 explicit select_socket_state(select_scheduler& sched) noexcept
88 168 : sched_(sched)
89 {
90 168 }
91
92 select_scheduler& sched_;
93 std::mutex mutex_;
94 intrusive_list<select_socket> socket_list_;
95 std::unordered_map<select_socket*, std::shared_ptr<select_socket>>
96 socket_ptrs_;
97 };
98
99 /** select socket service implementation.
100
101 Inherits from socket_service to enable runtime polymorphism.
102 Uses key_type = socket_service for service lookup.
103 */
104 class BOOST_COROSIO_DECL select_socket_service final : public socket_service
105 {
106 public:
107 explicit select_socket_service(capy::execution_context& ctx);
108 ~select_socket_service() override;
109
110 select_socket_service(select_socket_service const&) = delete;
111 select_socket_service& operator=(select_socket_service const&) = delete;
112
113 void shutdown() override;
114
115 io_object::implementation* construct() override;
116 void destroy(io_object::implementation*) override;
117 void close(io_object::handle&) override;
118 std::error_code open_socket(
119 tcp_socket::implementation& impl,
120 int family,
121 int type,
122 int protocol) override;
123
124 10939 select_scheduler& scheduler() const noexcept
125 {
126 10939 return state_->sched_;
127 }
128 void post(select_op* op);
129 void work_started() noexcept;
130 void work_finished() noexcept;
131
132 private:
133 std::unique_ptr<select_socket_state> state_;
134 };
135
136 // Backward compatibility alias
137 using select_sockets = select_socket_service;
138
139 inline void
140 98 select_op::canceller::operator()() const noexcept
141 {
142 98 op->cancel();
143 98 }
144
145 inline void
146 select_connect_op::cancel() noexcept
147 {
148 if (socket_impl_)
149 socket_impl_->cancel_single_op(*this);
150 else
151 request_cancel();
152 }
153
154 inline void
155 98 select_read_op::cancel() noexcept
156 {
157 98 if (socket_impl_)
158 98 socket_impl_->cancel_single_op(*this);
159 else
160 request_cancel();
161 98 }
162
163 inline void
164 select_write_op::cancel() noexcept
165 {
166 if (socket_impl_)
167 socket_impl_->cancel_single_op(*this);
168 else
169 request_cancel();
170 }
171
172 inline void
173 3497 select_connect_op::operator()()
174 {
175 3497 stop_cb.reset();
176
177 3497 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
178
179 // Cache endpoints on successful connect
180 3497 if (success && socket_impl_)
181 {
182 3495 endpoint local_ep;
183 3495 sockaddr_storage local_storage{};
184 3495 socklen_t local_len = sizeof(local_storage);
185 3495 if (::getsockname(
186 3495 fd, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
187 0)
188 3495 local_ep = from_sockaddr(local_storage);
189 3495 static_cast<select_socket*>(socket_impl_)
190 3495 ->set_endpoints(local_ep, target_endpoint);
191 }
192
193 3497 if (ec_out)
194 {
195 3497 if (cancelled.load(std::memory_order_acquire))
196 *ec_out = capy::error::canceled;
197 3497 else if (errn != 0)
198 2 *ec_out = make_err(errn);
199 else
200 3495 *ec_out = {};
201 }
202
203 3497 if (bytes_out)
204 *bytes_out = bytes_transferred;
205
206 // Move to stack before destroying the frame
207 3497 capy::executor_ref saved_ex(ex);
208 3497 std::coroutine_handle<> saved_h(h);
209 3497 impl_ptr.reset();
210 3497 dispatch_coro(saved_ex, saved_h).resume();
211 3497 }
212
213 10511 inline select_socket::select_socket(select_socket_service& svc) noexcept
214 10511 : svc_(svc)
215 {
216 10511 }
217
218 inline std::coroutine_handle<>
219 3497 select_socket::connect(
220 std::coroutine_handle<> h,
221 capy::executor_ref ex,
222 endpoint ep,
223 std::stop_token token,
224 std::error_code* ec)
225 {
226 3497 auto& op = conn_;
227 3497 op.reset();
228 3497 op.h = h;
229 3497 op.ex = ex;
230 3497 op.ec_out = ec;
231 3497 op.fd = fd_;
232 3497 op.target_endpoint = ep; // Store target for endpoint caching
233 3497 op.start(token, this);
234
235 3497 sockaddr_storage storage{};
236 socklen_t addrlen =
237 3497 detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
238 3497 int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
239
240 3497 if (result == 0)
241 {
242 // Sync success — cache endpoints immediately
243 sockaddr_storage local_storage{};
244 socklen_t local_len = sizeof(local_storage);
245 if (::getsockname(
246 fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
247 0)
248 local_endpoint_ = detail::from_sockaddr(local_storage);
249 remote_endpoint_ = ep;
250
251 op.complete(0, 0);
252 op.impl_ptr = shared_from_this();
253 svc_.post(&op);
254 // completion is always posted to scheduler queue, never inline.
255 return std::noop_coroutine();
256 }
257
258 3497 if (errno == EINPROGRESS)
259 {
260 3497 svc_.work_started();
261 3497 op.impl_ptr = shared_from_this();
262
263 // Set registering BEFORE register_fd to close the race window where
264 // reactor sees an event before we set registered. The reactor treats
265 // registering the same as registered when claiming the op.
266 3497 op.registered.store(
267 select_registration_state::registering, std::memory_order_release);
268 3497 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
269
270 // Transition to registered. If this fails, reactor or cancel already
271 // claimed the op (state is now unregistered), so we're done. However,
272 // we must still deregister the fd because cancel's deregister_fd may
273 // have run before our register_fd, leaving the fd orphaned.
274 3497 auto expected = select_registration_state::registering;
275 3497 if (!op.registered.compare_exchange_strong(
276 expected, select_registration_state::registered,
277 std::memory_order_acq_rel))
278 {
279 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
280 // completion is always posted to scheduler queue, never inline.
281 return std::noop_coroutine();
282 }
283
284 // If cancelled was set before we registered, handle it now.
285 3497 if (op.cancelled.load(std::memory_order_acquire))
286 {
287 auto prev = op.registered.exchange(
288 select_registration_state::unregistered,
289 std::memory_order_acq_rel);
290 if (prev != select_registration_state::unregistered)
291 {
292 svc_.scheduler().deregister_fd(
293 fd_, select_scheduler::event_write);
294 op.impl_ptr = shared_from_this();
295 svc_.post(&op);
296 svc_.work_finished();
297 }
298 }
299 // completion is always posted to scheduler queue, never inline.
300 3497 return std::noop_coroutine();
301 }
302
303 op.complete(errno, 0);
304 op.impl_ptr = shared_from_this();
305 svc_.post(&op);
306 // completion is always posted to scheduler queue, never inline.
307 return std::noop_coroutine();
308 }
309
310 inline std::coroutine_handle<>
311 77685 select_socket::read_some(
312 std::coroutine_handle<> h,
313 capy::executor_ref ex,
314 buffer_param param,
315 std::stop_token token,
316 std::error_code* ec,
317 std::size_t* bytes_out)
318 {
319 77685 auto& op = rd_;
320 77685 op.reset();
321 77685 op.h = h;
322 77685 op.ex = ex;
323 77685 op.ec_out = ec;
324 77685 op.bytes_out = bytes_out;
325 77685 op.fd = fd_;
326 77685 op.start(token, this);
327
328 77685 capy::mutable_buffer bufs[select_read_op::max_buffers];
329 77685 op.iovec_count =
330 77685 static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
331
332 77685 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
333 {
334 1 op.empty_buffer_read = true;
335 1 op.complete(0, 0);
336 1 op.impl_ptr = shared_from_this();
337 1 svc_.post(&op);
338 1 return std::noop_coroutine();
339 }
340
341 155368 for (int i = 0; i < op.iovec_count; ++i)
342 {
343 77684 op.iovecs[i].iov_base = bufs[i].data();
344 77684 op.iovecs[i].iov_len = bufs[i].size();
345 }
346
347 77684 ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
348
349 77684 if (n > 0)
350 {
351 77401 op.complete(0, static_cast<std::size_t>(n));
352 77401 op.impl_ptr = shared_from_this();
353 77401 svc_.post(&op);
354 77401 return std::noop_coroutine();
355 }
356
357 283 if (n == 0)
358 {
359 5 op.complete(0, 0);
360 5 op.impl_ptr = shared_from_this();
361 5 svc_.post(&op);
362 5 return std::noop_coroutine();
363 }
364
365 278 if (errno == EAGAIN || errno == EWOULDBLOCK)
366 {
367 278 svc_.work_started();
368 278 op.impl_ptr = shared_from_this();
369
370 // Set registering BEFORE register_fd to close the race window where
371 // reactor sees an event before we set registered.
372 278 op.registered.store(
373 select_registration_state::registering, std::memory_order_release);
374 278 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
375
376 // Transition to registered. If this fails, reactor or cancel already
377 // claimed the op (state is now unregistered), so we're done. However,
378 // we must still deregister the fd because cancel's deregister_fd may
379 // have run before our register_fd, leaving the fd orphaned.
380 278 auto expected = select_registration_state::registering;
381 278 if (!op.registered.compare_exchange_strong(
382 expected, select_registration_state::registered,
383 std::memory_order_acq_rel))
384 {
385 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
386 return std::noop_coroutine();
387 }
388
389 // If cancelled was set before we registered, handle it now.
390 278 if (op.cancelled.load(std::memory_order_acquire))
391 {
392 auto prev = op.registered.exchange(
393 select_registration_state::unregistered,
394 std::memory_order_acq_rel);
395 if (prev != select_registration_state::unregistered)
396 {
397 svc_.scheduler().deregister_fd(
398 fd_, select_scheduler::event_read);
399 op.impl_ptr = shared_from_this();
400 svc_.post(&op);
401 svc_.work_finished();
402 }
403 }
404 278 return std::noop_coroutine();
405 }
406
407 op.complete(errno, 0);
408 op.impl_ptr = shared_from_this();
409 svc_.post(&op);
410 return std::noop_coroutine();
411 }
412
413 inline std::coroutine_handle<>
414 77524 select_socket::write_some(
415 std::coroutine_handle<> h,
416 capy::executor_ref ex,
417 buffer_param param,
418 std::stop_token token,
419 std::error_code* ec,
420 std::size_t* bytes_out)
421 {
422 77524 auto& op = wr_;
423 77524 op.reset();
424 77524 op.h = h;
425 77524 op.ex = ex;
426 77524 op.ec_out = ec;
427 77524 op.bytes_out = bytes_out;
428 77524 op.fd = fd_;
429 77524 op.start(token, this);
430
431 77524 capy::mutable_buffer bufs[select_write_op::max_buffers];
432 77524 op.iovec_count =
433 77524 static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
434
435 77524 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
436 {
437 1 op.complete(0, 0);
438 1 op.impl_ptr = shared_from_this();
439 1 svc_.post(&op);
440 1 return std::noop_coroutine();
441 }
442
443 155046 for (int i = 0; i < op.iovec_count; ++i)
444 {
445 77523 op.iovecs[i].iov_base = bufs[i].data();
446 77523 op.iovecs[i].iov_len = bufs[i].size();
447 }
448
449 77523 msghdr msg{};
450 77523 msg.msg_iov = op.iovecs;
451 77523 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
452
453 77523 ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
454
455 77523 if (n > 0)
456 {
457 77522 op.complete(0, static_cast<std::size_t>(n));
458 77522 op.impl_ptr = shared_from_this();
459 77522 svc_.post(&op);
460 77522 return std::noop_coroutine();
461 }
462
463 1 if (errno == EAGAIN || errno == EWOULDBLOCK)
464 {
465 svc_.work_started();
466 op.impl_ptr = shared_from_this();
467
468 // Set registering BEFORE register_fd to close the race window where
469 // reactor sees an event before we set registered.
470 op.registered.store(
471 select_registration_state::registering, std::memory_order_release);
472 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
473
474 // Transition to registered. If this fails, reactor or cancel already
475 // claimed the op (state is now unregistered), so we're done. However,
476 // we must still deregister the fd because cancel's deregister_fd may
477 // have run before our register_fd, leaving the fd orphaned.
478 auto expected = select_registration_state::registering;
479 if (!op.registered.compare_exchange_strong(
480 expected, select_registration_state::registered,
481 std::memory_order_acq_rel))
482 {
483 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
484 return std::noop_coroutine();
485 }
486
487 // If cancelled was set before we registered, handle it now.
488 if (op.cancelled.load(std::memory_order_acquire))
489 {
490 auto prev = op.registered.exchange(
491 select_registration_state::unregistered,
492 std::memory_order_acq_rel);
493 if (prev != select_registration_state::unregistered)
494 {
495 svc_.scheduler().deregister_fd(
496 fd_, select_scheduler::event_write);
497 op.impl_ptr = shared_from_this();
498 svc_.post(&op);
499 svc_.work_finished();
500 }
501 }
502 return std::noop_coroutine();
503 }
504
505 1 op.complete(errno ? errno : EIO, 0);
506 1 op.impl_ptr = shared_from_this();
507 1 svc_.post(&op);
508 1 return std::noop_coroutine();
509 }
510
511 inline std::error_code
512 3 select_socket::shutdown(tcp_socket::shutdown_type what) noexcept
513 {
514 int how;
515 3 switch (what)
516 {
517 1 case tcp_socket::shutdown_receive:
518 1 how = SHUT_RD;
519 1 break;
520 1 case tcp_socket::shutdown_send:
521 1 how = SHUT_WR;
522 1 break;
523 1 case tcp_socket::shutdown_both:
524 1 how = SHUT_RDWR;
525 1 break;
526 default:
527 return make_err(EINVAL);
528 }
529 3 if (::shutdown(fd_, how) != 0)
530 return make_err(errno);
531 3 return {};
532 }
533
534 inline std::error_code
535 28 select_socket::set_option(
536 int level, int optname, void const* data, std::size_t size) noexcept
537 {
538 28 if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) !=
539 0)
540 return make_err(errno);
541 28 return {};
542 }
543
544 inline std::error_code
545 31 select_socket::get_option(
546 int level, int optname, void* data, std::size_t* size) const noexcept
547 {
548 31 socklen_t len = static_cast<socklen_t>(*size);
549 31 if (::getsockopt(fd_, level, optname, data, &len) != 0)
550 return make_err(errno);
551 31 *size = static_cast<std::size_t>(len);
552 31 return {};
553 }
554
555 inline void
556 173 select_socket::cancel() noexcept
557 {
558 173 auto self = weak_from_this().lock();
559 173 if (!self)
560 return;
561
562 519 auto cancel_op = [this, &self](select_op& op, int events) {
563 519 auto prev = op.registered.exchange(
564 select_registration_state::unregistered, std::memory_order_acq_rel);
565 519 op.request_cancel();
566 519 if (prev != select_registration_state::unregistered)
567 {
568 90 svc_.scheduler().deregister_fd(fd_, events);
569 90 op.impl_ptr = self;
570 90 svc_.post(&op);
571 90 svc_.work_finished();
572 }
573 692 };
574
575 173 cancel_op(conn_, select_scheduler::event_write);
576 173 cancel_op(rd_, select_scheduler::event_read);
577 173 cancel_op(wr_, select_scheduler::event_write);
578 173 }
579
580 inline void
581 98 select_socket::cancel_single_op(select_op& op) noexcept
582 {
583 98 auto self = weak_from_this().lock();
584 98 if (!self)
585 return;
586
587 // Called from stop_token callback to cancel a specific pending operation.
588 98 auto prev = op.registered.exchange(
589 select_registration_state::unregistered, std::memory_order_acq_rel);
590 98 op.request_cancel();
591
592 98 if (prev != select_registration_state::unregistered)
593 {
594 // Determine which event type to deregister
595 66 int events = 0;
596 66 if (&op == &conn_ || &op == &wr_)
597 events = select_scheduler::event_write;
598 66 else if (&op == &rd_)
599 66 events = select_scheduler::event_read;
600
601 66 svc_.scheduler().deregister_fd(fd_, events);
602
603 66 op.impl_ptr = self;
604 66 svc_.post(&op);
605 66 svc_.work_finished();
606 }
607 98 }
608
609 inline void
610 31541 select_socket::close_socket() noexcept
611 {
612 31541 auto self = weak_from_this().lock();
613 31541 if (self)
614 {
615 94623 auto cancel_op = [this, &self](select_op& op, int events) {
616 94623 auto prev = op.registered.exchange(
617 select_registration_state::unregistered,
618 std::memory_order_acq_rel);
619 94623 op.request_cancel();
620 94623 if (prev != select_registration_state::unregistered)
621 {
622 1 svc_.scheduler().deregister_fd(fd_, events);
623 1 op.impl_ptr = self;
624 1 svc_.post(&op);
625 1 svc_.work_finished();
626 }
627 126164 };
628
629 31541 cancel_op(conn_, select_scheduler::event_write);
630 31541 cancel_op(rd_, select_scheduler::event_read);
631 31541 cancel_op(wr_, select_scheduler::event_write);
632 }
633
634 31541 if (fd_ >= 0)
635 {
636 7007 svc_.scheduler().deregister_fd(
637 fd_, select_scheduler::event_read | select_scheduler::event_write);
638 7007 ::close(fd_);
639 7007 fd_ = -1;
640 }
641
642 31541 local_endpoint_ = endpoint{};
643 31541 remote_endpoint_ = endpoint{};
644 31541 }
645
646 168 inline select_socket_service::select_socket_service(
647 168 capy::execution_context& ctx)
648 168 : state_(
649 std::make_unique<select_socket_state>(
650 168 ctx.use_service<select_scheduler>()))
651 {
652 168 }
653
654 336 inline select_socket_service::~select_socket_service() {}
655
656 inline void
657 168 select_socket_service::shutdown()
658 {
659 168 std::lock_guard lock(state_->mutex_);
660
661 168 while (auto* impl = state_->socket_list_.pop_front())
662 impl->close_socket();
663
664 // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
665 // drains completed_ops_, calling destroy() on each queued op. Letting
666 // ~state_ release the ptrs (during service destruction, after scheduler
667 // shutdown) keeps every impl alive until all ops have been drained.
668 168 }
669
670 inline io_object::implementation*
671 10511 select_socket_service::construct()
672 {
673 10511 auto impl = std::make_shared<select_socket>(*this);
674 10511 auto* raw = impl.get();
675
676 {
677 10511 std::lock_guard lock(state_->mutex_);
678 10511 state_->socket_list_.push_back(raw);
679 10511 state_->socket_ptrs_.emplace(raw, std::move(impl));
680 10511 }
681
682 10511 return raw;
683 10511 }
684
685 inline void
686 10511 select_socket_service::destroy(io_object::implementation* impl)
687 {
688 10511 auto* select_impl = static_cast<select_socket*>(impl);
689 10511 select_impl->close_socket();
690 10511 std::lock_guard lock(state_->mutex_);
691 10511 state_->socket_list_.remove(select_impl);
692 10511 state_->socket_ptrs_.erase(select_impl);
693 10511 }
694
695 inline std::error_code
696 3512 select_socket_service::open_socket(
697 tcp_socket::implementation& impl, int family, int type, int protocol)
698 {
699 3512 auto* select_impl = static_cast<select_socket*>(&impl);
700 3512 select_impl->close_socket();
701
702 3512 int fd = ::socket(family, type, protocol);
703 3512 if (fd < 0)
704 return make_err(errno);
705
706 3512 if (family == AF_INET6)
707 {
708 5 int one = 1;
709 5 ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
710 }
711
712 // Set non-blocking and close-on-exec
713 3512 int flags = ::fcntl(fd, F_GETFL, 0);
714 3512 if (flags == -1)
715 {
716 int errn = errno;
717 ::close(fd);
718 return make_err(errn);
719 }
720 3512 if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
721 {
722 int errn = errno;
723 ::close(fd);
724 return make_err(errn);
725 }
726 3512 if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
727 {
728 int errn = errno;
729 ::close(fd);
730 return make_err(errn);
731 }
732
733 // Check fd is within select() limits
734 3512 if (fd >= FD_SETSIZE)
735 {
736 ::close(fd);
737 return make_err(EMFILE); // Too many open files
738 }
739
740 3512 select_impl->fd_ = fd;
741 3512 return {};
742 }
743
744 inline void
745 17518 select_socket_service::close(io_object::handle& h)
746 {
747 17518 static_cast<select_socket*>(h.get())->close_socket();
748 17518 }
749
750 inline void
751 155088 select_socket_service::post(select_op* op)
752 {
753 155088 state_->sched_.post(op);
754 155088 }
755
756 inline void
757 3775 select_socket_service::work_started() noexcept
758 {
759 3775 state_->sched_.work_started();
760 3775 }
761
762 inline void
763 157 select_socket_service::work_finished() noexcept
764 {
765 157 state_->sched_.work_finished();
766 157 }
767
768 } // namespace boost::corosio::detail
769
770 #endif // BOOST_COROSIO_HAS_SELECT
771
772 #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
773