TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_ACCEPTOR_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_ACCEPTOR_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_SELECT
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 : #include <boost/corosio/detail/acceptor_service.hpp>
20 :
21 : #include <boost/corosio/native/detail/select/select_acceptor.hpp>
22 : #include <boost/corosio/native/detail/select/select_socket_service.hpp>
23 : #include <boost/corosio/native/detail/select/select_scheduler.hpp>
24 :
25 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
26 : #include <boost/corosio/detail/dispatch_coro.hpp>
27 : #include <boost/corosio/native/detail/make_err.hpp>
28 :
29 : #include <errno.h>
30 : #include <fcntl.h>
31 : #include <netinet/in.h>
32 : #include <sys/socket.h>
33 : #include <unistd.h>
34 :
35 : #include <memory>
36 : #include <mutex>
37 : #include <unordered_map>
38 :
39 : namespace boost::corosio::detail {
40 :
41 : /** State for select acceptor service. */
42 : class select_acceptor_state
43 : {
44 : public:
45 HIT 168 : explicit select_acceptor_state(select_scheduler& sched) noexcept
46 168 : : sched_(sched)
47 : {
48 168 : }
49 :
50 : select_scheduler& sched_;
51 : std::mutex mutex_;
52 : intrusive_list<select_acceptor> acceptor_list_;
53 : std::unordered_map<select_acceptor*, std::shared_ptr<select_acceptor>>
54 : acceptor_ptrs_;
55 : };
56 :
57 : /** select acceptor service implementation.
58 :
59 : Inherits from acceptor_service to enable runtime polymorphism.
60 : Uses key_type = acceptor_service for service lookup.
61 : */
62 : class BOOST_COROSIO_DECL select_acceptor_service final : public acceptor_service
63 : {
64 : public:
65 : explicit select_acceptor_service(capy::execution_context& ctx);
66 : ~select_acceptor_service() override;
67 :
68 : select_acceptor_service(select_acceptor_service const&) = delete;
69 : select_acceptor_service& operator=(select_acceptor_service const&) = delete;
70 :
71 : void shutdown() override;
72 :
73 : io_object::implementation* construct() override;
74 : void destroy(io_object::implementation*) override;
75 : void close(io_object::handle&) override;
76 : std::error_code open_acceptor_socket(
77 : tcp_acceptor::implementation& impl,
78 : int family,
79 : int type,
80 : int protocol) override;
81 : std::error_code
82 : bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
83 : std::error_code
84 : listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
85 :
86 3558 : select_scheduler& scheduler() const noexcept
87 : {
88 3558 : return state_->sched_;
89 : }
90 : void post(select_op* op);
91 : void work_started() noexcept;
92 : void work_finished() noexcept;
93 :
94 : /** Get the socket service for creating peer sockets during accept. */
95 : select_socket_service* socket_service() const noexcept;
96 :
97 : private:
98 : capy::execution_context& ctx_;
99 : std::unique_ptr<select_acceptor_state> state_;
100 : };
101 :
102 : inline void
103 MIS 0 : select_accept_op::cancel() noexcept
104 : {
105 0 : if (acceptor_impl_)
106 0 : acceptor_impl_->cancel_single_op(*this);
107 : else
108 0 : request_cancel();
109 0 : }
110 :
111 : inline void
112 HIT 3498 : select_accept_op::operator()()
113 : {
114 3498 : stop_cb.reset();
115 :
116 3498 : bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
117 :
118 3498 : if (ec_out)
119 : {
120 3498 : if (cancelled.load(std::memory_order_acquire))
121 3 : *ec_out = capy::error::canceled;
122 3495 : else if (errn != 0)
123 MIS 0 : *ec_out = make_err(errn);
124 : else
125 HIT 3495 : *ec_out = {};
126 : }
127 :
128 3498 : if (success && accepted_fd >= 0)
129 : {
130 3495 : if (acceptor_impl_)
131 : {
132 3495 : auto* socket_svc = static_cast<select_acceptor*>(acceptor_impl_)
133 3495 : ->service()
134 3495 : .socket_service();
135 3495 : if (socket_svc)
136 : {
137 : auto& impl =
138 3495 : static_cast<select_socket&>(*socket_svc->construct());
139 3495 : impl.set_socket(accepted_fd);
140 :
141 3495 : sockaddr_storage local_storage{};
142 3495 : socklen_t local_len = sizeof(local_storage);
143 3495 : sockaddr_storage remote_storage{};
144 3495 : socklen_t remote_len = sizeof(remote_storage);
145 :
146 3495 : endpoint local_ep, remote_ep;
147 3495 : if (::getsockname(
148 : accepted_fd,
149 : reinterpret_cast<sockaddr*>(&local_storage),
150 3495 : &local_len) == 0)
151 3495 : local_ep = from_sockaddr(local_storage);
152 3495 : if (::getpeername(
153 : accepted_fd,
154 : reinterpret_cast<sockaddr*>(&remote_storage),
155 3495 : &remote_len) == 0)
156 3495 : remote_ep = from_sockaddr(remote_storage);
157 :
158 3495 : impl.set_endpoints(local_ep, remote_ep);
159 :
160 3495 : if (impl_out)
161 3495 : *impl_out = &impl;
162 :
163 3495 : accepted_fd = -1;
164 : }
165 : else
166 : {
167 MIS 0 : if (ec_out && !*ec_out)
168 0 : *ec_out = make_err(ENOENT);
169 0 : ::close(accepted_fd);
170 0 : accepted_fd = -1;
171 0 : if (impl_out)
172 0 : *impl_out = nullptr;
173 : }
174 : }
175 : else
176 : {
177 0 : ::close(accepted_fd);
178 0 : accepted_fd = -1;
179 0 : if (impl_out)
180 0 : *impl_out = nullptr;
181 : }
182 HIT 3495 : }
183 : else
184 : {
185 3 : if (accepted_fd >= 0)
186 : {
187 MIS 0 : ::close(accepted_fd);
188 0 : accepted_fd = -1;
189 : }
190 :
191 HIT 3 : if (peer_impl)
192 : {
193 : auto* socket_svc_cleanup =
194 MIS 0 : static_cast<select_acceptor*>(acceptor_impl_)
195 0 : ->service()
196 0 : .socket_service();
197 0 : if (socket_svc_cleanup)
198 0 : socket_svc_cleanup->destroy(peer_impl);
199 0 : peer_impl = nullptr;
200 : }
201 :
202 HIT 3 : if (impl_out)
203 3 : *impl_out = nullptr;
204 : }
205 :
206 : // Move to stack before destroying the frame
207 3498 : capy::executor_ref saved_ex(ex);
208 3498 : std::coroutine_handle<> saved_h(h);
209 3498 : impl_ptr.reset();
210 3498 : dispatch_coro(saved_ex, saved_h).resume();
211 3498 : }
212 :
213 61 : inline select_acceptor::select_acceptor(select_acceptor_service& svc) noexcept
214 61 : : svc_(svc)
215 : {
216 61 : }
217 :
218 : inline std::coroutine_handle<>
219 3498 : select_acceptor::accept(
220 : std::coroutine_handle<> h,
221 : capy::executor_ref ex,
222 : std::stop_token token,
223 : std::error_code* ec,
224 : io_object::implementation** impl_out)
225 : {
226 3498 : auto& op = acc_;
227 3498 : op.reset();
228 3498 : op.h = h;
229 3498 : op.ex = ex;
230 3498 : op.ec_out = ec;
231 3498 : op.impl_out = impl_out;
232 3498 : op.fd = fd_;
233 3498 : op.start(token, this);
234 :
235 3498 : sockaddr_storage peer_storage{};
236 3498 : socklen_t addrlen = sizeof(peer_storage);
237 : int accepted =
238 3498 : ::accept(fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen);
239 :
240 3498 : if (accepted >= 0)
241 : {
242 : // Reject fds that exceed select()'s FD_SETSIZE limit.
243 2 : if (accepted >= FD_SETSIZE)
244 : {
245 MIS 0 : ::close(accepted);
246 0 : op.accepted_fd = -1;
247 0 : op.complete(EINVAL, 0);
248 0 : op.impl_ptr = shared_from_this();
249 0 : svc_.post(&op);
250 0 : return std::noop_coroutine();
251 : }
252 :
253 : // Set non-blocking and close-on-exec flags.
254 HIT 2 : int flags = ::fcntl(accepted, F_GETFL, 0);
255 2 : if (flags == -1)
256 : {
257 MIS 0 : int err = errno;
258 0 : ::close(accepted);
259 0 : op.accepted_fd = -1;
260 0 : op.complete(err, 0);
261 0 : op.impl_ptr = shared_from_this();
262 0 : svc_.post(&op);
263 0 : return std::noop_coroutine();
264 : }
265 :
266 HIT 2 : if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
267 : {
268 MIS 0 : int err = errno;
269 0 : ::close(accepted);
270 0 : op.accepted_fd = -1;
271 0 : op.complete(err, 0);
272 0 : op.impl_ptr = shared_from_this();
273 0 : svc_.post(&op);
274 0 : return std::noop_coroutine();
275 : }
276 :
277 HIT 2 : if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
278 : {
279 MIS 0 : int err = errno;
280 0 : ::close(accepted);
281 0 : op.accepted_fd = -1;
282 0 : op.complete(err, 0);
283 0 : op.impl_ptr = shared_from_this();
284 0 : svc_.post(&op);
285 0 : return std::noop_coroutine();
286 : }
287 :
288 HIT 2 : op.accepted_fd = accepted;
289 2 : op.complete(0, 0);
290 2 : op.impl_ptr = shared_from_this();
291 2 : svc_.post(&op);
292 2 : return std::noop_coroutine();
293 : }
294 :
295 3496 : if (errno == EAGAIN || errno == EWOULDBLOCK)
296 : {
297 3496 : svc_.work_started();
298 3496 : op.impl_ptr = shared_from_this();
299 :
300 : // Set registering BEFORE register_fd to close the race window where
301 : // reactor sees an event before we set registered.
302 3496 : op.registered.store(
303 : select_registration_state::registering, std::memory_order_release);
304 3496 : svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
305 :
306 : // Transition to registered. If this fails, reactor or cancel already
307 : // claimed the op (state is now unregistered), so we're done. However,
308 : // we must still deregister the fd because cancel's deregister_fd may
309 : // have run before our register_fd, leaving the fd orphaned.
310 3496 : auto expected = select_registration_state::registering;
311 3496 : if (!op.registered.compare_exchange_strong(
312 : expected, select_registration_state::registered,
313 : std::memory_order_acq_rel))
314 : {
315 MIS 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
316 0 : return std::noop_coroutine();
317 : }
318 :
319 : // If cancelled was set before we registered, handle it now.
320 HIT 3496 : if (op.cancelled.load(std::memory_order_acquire))
321 : {
322 MIS 0 : auto prev = op.registered.exchange(
323 : select_registration_state::unregistered,
324 : std::memory_order_acq_rel);
325 0 : if (prev != select_registration_state::unregistered)
326 : {
327 0 : svc_.scheduler().deregister_fd(
328 : fd_, select_scheduler::event_read);
329 0 : op.impl_ptr = shared_from_this();
330 0 : svc_.post(&op);
331 0 : svc_.work_finished();
332 : }
333 : }
334 HIT 3496 : return std::noop_coroutine();
335 : }
336 :
337 MIS 0 : op.complete(errno, 0);
338 0 : op.impl_ptr = shared_from_this();
339 0 : svc_.post(&op);
340 0 : return std::noop_coroutine();
341 : }
342 :
343 : inline void
344 HIT 2 : select_acceptor::cancel() noexcept
345 : {
346 2 : auto self = weak_from_this().lock();
347 2 : if (!self)
348 MIS 0 : return;
349 :
350 HIT 2 : auto prev = acc_.registered.exchange(
351 : select_registration_state::unregistered, std::memory_order_acq_rel);
352 2 : acc_.request_cancel();
353 :
354 2 : if (prev != select_registration_state::unregistered)
355 : {
356 1 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
357 1 : acc_.impl_ptr = self;
358 1 : svc_.post(&acc_);
359 1 : svc_.work_finished();
360 : }
361 2 : }
362 :
363 : inline void
364 MIS 0 : select_acceptor::cancel_single_op(select_op& op) noexcept
365 : {
366 0 : auto self = weak_from_this().lock();
367 0 : if (!self)
368 0 : return;
369 :
370 0 : auto prev = op.registered.exchange(
371 : select_registration_state::unregistered, std::memory_order_acq_rel);
372 0 : op.request_cancel();
373 :
374 0 : if (prev != select_registration_state::unregistered)
375 : {
376 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
377 :
378 0 : op.impl_ptr = self;
379 0 : svc_.post(&op);
380 0 : svc_.work_finished();
381 : }
382 0 : }
383 :
384 : inline void
385 HIT 240 : select_acceptor::close_socket() noexcept
386 : {
387 240 : auto self = weak_from_this().lock();
388 240 : if (self)
389 : {
390 240 : auto prev = acc_.registered.exchange(
391 : select_registration_state::unregistered, std::memory_order_acq_rel);
392 240 : acc_.request_cancel();
393 :
394 240 : if (prev != select_registration_state::unregistered)
395 : {
396 2 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
397 2 : acc_.impl_ptr = self;
398 2 : svc_.post(&acc_);
399 2 : svc_.work_finished();
400 : }
401 : }
402 :
403 240 : if (fd_ >= 0)
404 : {
405 59 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
406 59 : ::close(fd_);
407 59 : fd_ = -1;
408 : }
409 :
410 240 : local_endpoint_ = endpoint{};
411 240 : }
412 :
413 168 : inline select_acceptor_service::select_acceptor_service(
414 168 : capy::execution_context& ctx)
415 168 : : ctx_(ctx)
416 168 : , state_(
417 : std::make_unique<select_acceptor_state>(
418 168 : ctx.use_service<select_scheduler>()))
419 : {
420 168 : }
421 :
422 336 : inline select_acceptor_service::~select_acceptor_service() {}
423 :
424 : inline void
425 168 : select_acceptor_service::shutdown()
426 : {
427 168 : std::lock_guard lock(state_->mutex_);
428 :
429 168 : while (auto* impl = state_->acceptor_list_.pop_front())
430 MIS 0 : impl->close_socket();
431 :
432 : // Don't clear acceptor_ptrs_ here — same rationale as
433 : // select_socket_service::shutdown(). Let ~state_ release ptrs
434 : // after scheduler shutdown has drained all queued ops.
435 HIT 168 : }
436 :
437 : inline io_object::implementation*
438 61 : select_acceptor_service::construct()
439 : {
440 61 : auto impl = std::make_shared<select_acceptor>(*this);
441 61 : auto* raw = impl.get();
442 :
443 61 : std::lock_guard lock(state_->mutex_);
444 61 : state_->acceptor_list_.push_back(raw);
445 61 : state_->acceptor_ptrs_.emplace(raw, std::move(impl));
446 :
447 61 : return raw;
448 61 : }
449 :
450 : inline void
451 61 : select_acceptor_service::destroy(io_object::implementation* impl)
452 : {
453 61 : auto* select_impl = static_cast<select_acceptor*>(impl);
454 61 : select_impl->close_socket();
455 61 : std::lock_guard lock(state_->mutex_);
456 61 : state_->acceptor_list_.remove(select_impl);
457 61 : state_->acceptor_ptrs_.erase(select_impl);
458 61 : }
459 :
460 : inline void
461 120 : select_acceptor_service::close(io_object::handle& h)
462 : {
463 120 : static_cast<select_acceptor*>(h.get())->close_socket();
464 120 : }
465 :
466 : inline std::error_code
467 58 : select_acceptor::set_option(
468 : int level, int optname, void const* data, std::size_t size) noexcept
469 : {
470 58 : if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) !=
471 : 0)
472 MIS 0 : return make_err(errno);
473 HIT 58 : return {};
474 : }
475 :
476 : inline std::error_code
477 MIS 0 : select_acceptor::get_option(
478 : int level, int optname, void* data, std::size_t* size) const noexcept
479 : {
480 0 : socklen_t len = static_cast<socklen_t>(*size);
481 0 : if (::getsockopt(fd_, level, optname, data, &len) != 0)
482 0 : return make_err(errno);
483 0 : *size = static_cast<std::size_t>(len);
484 0 : return {};
485 : }
486 :
487 : inline std::error_code
488 HIT 59 : select_acceptor_service::open_acceptor_socket(
489 : tcp_acceptor::implementation& impl, int family, int type, int protocol)
490 : {
491 59 : auto* select_impl = static_cast<select_acceptor*>(&impl);
492 59 : select_impl->close_socket();
493 :
494 59 : int fd = ::socket(family, type, protocol);
495 59 : if (fd < 0)
496 MIS 0 : return make_err(errno);
497 :
498 : // Set non-blocking and close-on-exec
499 HIT 59 : int flags = ::fcntl(fd, F_GETFL, 0);
500 59 : if (flags == -1)
501 : {
502 MIS 0 : int errn = errno;
503 0 : ::close(fd);
504 0 : return make_err(errn);
505 : }
506 HIT 59 : if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
507 : {
508 MIS 0 : int errn = errno;
509 0 : ::close(fd);
510 0 : return make_err(errn);
511 : }
512 HIT 59 : if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
513 : {
514 MIS 0 : int errn = errno;
515 0 : ::close(fd);
516 0 : return make_err(errn);
517 : }
518 :
519 HIT 59 : if (fd >= FD_SETSIZE)
520 : {
521 MIS 0 : ::close(fd);
522 0 : return make_err(EMFILE);
523 : }
524 :
525 HIT 59 : if (family == AF_INET6)
526 : {
527 8 : int val = 0; // dual-stack default
528 8 : ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
529 : }
530 :
531 59 : select_impl->fd_ = fd;
532 59 : return {};
533 : }
534 :
535 : inline std::error_code
536 58 : select_acceptor_service::bind_acceptor(
537 : tcp_acceptor::implementation& impl, endpoint ep)
538 : {
539 58 : auto* select_impl = static_cast<select_acceptor*>(&impl);
540 58 : int fd = select_impl->fd_;
541 :
542 58 : sockaddr_storage storage{};
543 58 : socklen_t addrlen = detail::to_sockaddr(ep, storage);
544 58 : if (::bind(fd, reinterpret_cast<sockaddr*>(&storage), addrlen) < 0)
545 1 : return make_err(errno);
546 :
547 : // Cache local endpoint (resolves ephemeral port)
548 57 : sockaddr_storage local{};
549 57 : socklen_t local_len = sizeof(local);
550 57 : if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local), &local_len) == 0)
551 57 : select_impl->set_local_endpoint(detail::from_sockaddr(local));
552 :
553 57 : return {};
554 : }
555 :
556 : inline std::error_code
557 57 : select_acceptor_service::listen_acceptor(
558 : tcp_acceptor::implementation& impl, int backlog)
559 : {
560 57 : auto* select_impl = static_cast<select_acceptor*>(&impl);
561 57 : int fd = select_impl->fd_;
562 :
563 57 : if (::listen(fd, backlog) < 0)
564 MIS 0 : return make_err(errno);
565 :
566 HIT 57 : return {};
567 : }
568 :
569 : inline void
570 5 : select_acceptor_service::post(select_op* op)
571 : {
572 5 : state_->sched_.post(op);
573 5 : }
574 :
575 : inline void
576 3496 : select_acceptor_service::work_started() noexcept
577 : {
578 3496 : state_->sched_.work_started();
579 3496 : }
580 :
581 : inline void
582 3 : select_acceptor_service::work_finished() noexcept
583 : {
584 3 : state_->sched_.work_finished();
585 3 : }
586 :
587 : inline select_socket_service*
588 3495 : select_acceptor_service::socket_service() const noexcept
589 : {
590 3495 : auto* svc = ctx_.find_service<detail::socket_service>();
591 3495 : return svc ? dynamic_cast<select_socket_service*>(svc) : nullptr;
592 : }
593 :
594 : } // namespace boost::corosio::detail
595 :
596 : #endif // BOOST_COROSIO_HAS_SELECT
597 :
598 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_ACCEPTOR_SERVICE_HPP
|