include/boost/corosio/native/detail/select/select_acceptor_service.hpp

65.5% Lines (184/281) 87.5% Functions (21/24)
include/boost/corosio/native/detail/select/select_acceptor_service.hpp
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_ACCEPTOR_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_ACCEPTOR_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/acceptor_service.hpp>
20
21 #include <boost/corosio/native/detail/select/select_acceptor.hpp>
22 #include <boost/corosio/native/detail/select/select_socket_service.hpp>
23 #include <boost/corosio/native/detail/select/select_scheduler.hpp>
24
25 #include <boost/corosio/native/detail/endpoint_convert.hpp>
26 #include <boost/corosio/detail/dispatch_coro.hpp>
27 #include <boost/corosio/native/detail/make_err.hpp>
28
29 #include <errno.h>
30 #include <fcntl.h>
31 #include <netinet/in.h>
32 #include <sys/socket.h>
33 #include <unistd.h>
34
35 #include <memory>
36 #include <mutex>
37 #include <unordered_map>
38
39 namespace boost::corosio::detail {
40
41 /** State for select acceptor service. */
42 class select_acceptor_state
43 {
44 public:
45 168 explicit select_acceptor_state(select_scheduler& sched) noexcept
46 168 : sched_(sched)
47 {
48 168 }
49
50 select_scheduler& sched_;
51 std::mutex mutex_;
52 intrusive_list<select_acceptor> acceptor_list_;
53 std::unordered_map<select_acceptor*, std::shared_ptr<select_acceptor>>
54 acceptor_ptrs_;
55 };
56
57 /** select acceptor service implementation.
58
59 Inherits from acceptor_service to enable runtime polymorphism.
60 Uses key_type = acceptor_service for service lookup.
61 */
62 class BOOST_COROSIO_DECL select_acceptor_service final : public acceptor_service
63 {
64 public:
65 explicit select_acceptor_service(capy::execution_context& ctx);
66 ~select_acceptor_service() override;
67
68 select_acceptor_service(select_acceptor_service const&) = delete;
69 select_acceptor_service& operator=(select_acceptor_service const&) = delete;
70
71 void shutdown() override;
72
73 io_object::implementation* construct() override;
74 void destroy(io_object::implementation*) override;
75 void close(io_object::handle&) override;
76 std::error_code open_acceptor_socket(
77 tcp_acceptor::implementation& impl,
78 int family,
79 int type,
80 int protocol) override;
81 std::error_code
82 bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
83 std::error_code
84 listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
85
86 3558 select_scheduler& scheduler() const noexcept
87 {
88 3558 return state_->sched_;
89 }
90 void post(select_op* op);
91 void work_started() noexcept;
92 void work_finished() noexcept;
93
94 /** Get the socket service for creating peer sockets during accept. */
95 select_socket_service* socket_service() const noexcept;
96
97 private:
98 capy::execution_context& ctx_;
99 std::unique_ptr<select_acceptor_state> state_;
100 };
101
102 inline void
103 select_accept_op::cancel() noexcept
104 {
105 if (acceptor_impl_)
106 acceptor_impl_->cancel_single_op(*this);
107 else
108 request_cancel();
109 }
110
111 inline void
112 3498 select_accept_op::operator()()
113 {
114 3498 stop_cb.reset();
115
116 3498 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
117
118 3498 if (ec_out)
119 {
120 3498 if (cancelled.load(std::memory_order_acquire))
121 3 *ec_out = capy::error::canceled;
122 3495 else if (errn != 0)
123 *ec_out = make_err(errn);
124 else
125 3495 *ec_out = {};
126 }
127
128 3498 if (success && accepted_fd >= 0)
129 {
130 3495 if (acceptor_impl_)
131 {
132 3495 auto* socket_svc = static_cast<select_acceptor*>(acceptor_impl_)
133 3495 ->service()
134 3495 .socket_service();
135 3495 if (socket_svc)
136 {
137 auto& impl =
138 3495 static_cast<select_socket&>(*socket_svc->construct());
139 3495 impl.set_socket(accepted_fd);
140
141 3495 sockaddr_storage local_storage{};
142 3495 socklen_t local_len = sizeof(local_storage);
143 3495 sockaddr_storage remote_storage{};
144 3495 socklen_t remote_len = sizeof(remote_storage);
145
146 3495 endpoint local_ep, remote_ep;
147 3495 if (::getsockname(
148 accepted_fd,
149 reinterpret_cast<sockaddr*>(&local_storage),
150 3495 &local_len) == 0)
151 3495 local_ep = from_sockaddr(local_storage);
152 3495 if (::getpeername(
153 accepted_fd,
154 reinterpret_cast<sockaddr*>(&remote_storage),
155 3495 &remote_len) == 0)
156 3495 remote_ep = from_sockaddr(remote_storage);
157
158 3495 impl.set_endpoints(local_ep, remote_ep);
159
160 3495 if (impl_out)
161 3495 *impl_out = &impl;
162
163 3495 accepted_fd = -1;
164 }
165 else
166 {
167 if (ec_out && !*ec_out)
168 *ec_out = make_err(ENOENT);
169 ::close(accepted_fd);
170 accepted_fd = -1;
171 if (impl_out)
172 *impl_out = nullptr;
173 }
174 }
175 else
176 {
177 ::close(accepted_fd);
178 accepted_fd = -1;
179 if (impl_out)
180 *impl_out = nullptr;
181 }
182 3495 }
183 else
184 {
185 3 if (accepted_fd >= 0)
186 {
187 ::close(accepted_fd);
188 accepted_fd = -1;
189 }
190
191 3 if (peer_impl)
192 {
193 auto* socket_svc_cleanup =
194 static_cast<select_acceptor*>(acceptor_impl_)
195 ->service()
196 .socket_service();
197 if (socket_svc_cleanup)
198 socket_svc_cleanup->destroy(peer_impl);
199 peer_impl = nullptr;
200 }
201
202 3 if (impl_out)
203 3 *impl_out = nullptr;
204 }
205
206 // Move to stack before destroying the frame
207 3498 capy::executor_ref saved_ex(ex);
208 3498 std::coroutine_handle<> saved_h(h);
209 3498 impl_ptr.reset();
210 3498 dispatch_coro(saved_ex, saved_h).resume();
211 3498 }
212
213 61 inline select_acceptor::select_acceptor(select_acceptor_service& svc) noexcept
214 61 : svc_(svc)
215 {
216 61 }
217
218 inline std::coroutine_handle<>
219 3498 select_acceptor::accept(
220 std::coroutine_handle<> h,
221 capy::executor_ref ex,
222 std::stop_token token,
223 std::error_code* ec,
224 io_object::implementation** impl_out)
225 {
226 3498 auto& op = acc_;
227 3498 op.reset();
228 3498 op.h = h;
229 3498 op.ex = ex;
230 3498 op.ec_out = ec;
231 3498 op.impl_out = impl_out;
232 3498 op.fd = fd_;
233 3498 op.start(token, this);
234
235 3498 sockaddr_storage peer_storage{};
236 3498 socklen_t addrlen = sizeof(peer_storage);
237 int accepted =
238 3498 ::accept(fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen);
239
240 3498 if (accepted >= 0)
241 {
242 // Reject fds that exceed select()'s FD_SETSIZE limit.
243 2 if (accepted >= FD_SETSIZE)
244 {
245 ::close(accepted);
246 op.accepted_fd = -1;
247 op.complete(EINVAL, 0);
248 op.impl_ptr = shared_from_this();
249 svc_.post(&op);
250 return std::noop_coroutine();
251 }
252
253 // Set non-blocking and close-on-exec flags.
254 2 int flags = ::fcntl(accepted, F_GETFL, 0);
255 2 if (flags == -1)
256 {
257 int err = errno;
258 ::close(accepted);
259 op.accepted_fd = -1;
260 op.complete(err, 0);
261 op.impl_ptr = shared_from_this();
262 svc_.post(&op);
263 return std::noop_coroutine();
264 }
265
266 2 if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
267 {
268 int err = errno;
269 ::close(accepted);
270 op.accepted_fd = -1;
271 op.complete(err, 0);
272 op.impl_ptr = shared_from_this();
273 svc_.post(&op);
274 return std::noop_coroutine();
275 }
276
277 2 if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
278 {
279 int err = errno;
280 ::close(accepted);
281 op.accepted_fd = -1;
282 op.complete(err, 0);
283 op.impl_ptr = shared_from_this();
284 svc_.post(&op);
285 return std::noop_coroutine();
286 }
287
288 2 op.accepted_fd = accepted;
289 2 op.complete(0, 0);
290 2 op.impl_ptr = shared_from_this();
291 2 svc_.post(&op);
292 2 return std::noop_coroutine();
293 }
294
295 3496 if (errno == EAGAIN || errno == EWOULDBLOCK)
296 {
297 3496 svc_.work_started();
298 3496 op.impl_ptr = shared_from_this();
299
300 // Set registering BEFORE register_fd to close the race window where
301 // reactor sees an event before we set registered.
302 3496 op.registered.store(
303 select_registration_state::registering, std::memory_order_release);
304 3496 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
305
306 // Transition to registered. If this fails, reactor or cancel already
307 // claimed the op (state is now unregistered), so we're done. However,
308 // we must still deregister the fd because cancel's deregister_fd may
309 // have run before our register_fd, leaving the fd orphaned.
310 3496 auto expected = select_registration_state::registering;
311 3496 if (!op.registered.compare_exchange_strong(
312 expected, select_registration_state::registered,
313 std::memory_order_acq_rel))
314 {
315 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
316 return std::noop_coroutine();
317 }
318
319 // If cancelled was set before we registered, handle it now.
320 3496 if (op.cancelled.load(std::memory_order_acquire))
321 {
322 auto prev = op.registered.exchange(
323 select_registration_state::unregistered,
324 std::memory_order_acq_rel);
325 if (prev != select_registration_state::unregistered)
326 {
327 svc_.scheduler().deregister_fd(
328 fd_, select_scheduler::event_read);
329 op.impl_ptr = shared_from_this();
330 svc_.post(&op);
331 svc_.work_finished();
332 }
333 }
334 3496 return std::noop_coroutine();
335 }
336
337 op.complete(errno, 0);
338 op.impl_ptr = shared_from_this();
339 svc_.post(&op);
340 return std::noop_coroutine();
341 }
342
343 inline void
344 2 select_acceptor::cancel() noexcept
345 {
346 2 auto self = weak_from_this().lock();
347 2 if (!self)
348 return;
349
350 2 auto prev = acc_.registered.exchange(
351 select_registration_state::unregistered, std::memory_order_acq_rel);
352 2 acc_.request_cancel();
353
354 2 if (prev != select_registration_state::unregistered)
355 {
356 1 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
357 1 acc_.impl_ptr = self;
358 1 svc_.post(&acc_);
359 1 svc_.work_finished();
360 }
361 2 }
362
363 inline void
364 select_acceptor::cancel_single_op(select_op& op) noexcept
365 {
366 auto self = weak_from_this().lock();
367 if (!self)
368 return;
369
370 auto prev = op.registered.exchange(
371 select_registration_state::unregistered, std::memory_order_acq_rel);
372 op.request_cancel();
373
374 if (prev != select_registration_state::unregistered)
375 {
376 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
377
378 op.impl_ptr = self;
379 svc_.post(&op);
380 svc_.work_finished();
381 }
382 }
383
384 inline void
385 240 select_acceptor::close_socket() noexcept
386 {
387 240 auto self = weak_from_this().lock();
388 240 if (self)
389 {
390 240 auto prev = acc_.registered.exchange(
391 select_registration_state::unregistered, std::memory_order_acq_rel);
392 240 acc_.request_cancel();
393
394 240 if (prev != select_registration_state::unregistered)
395 {
396 2 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
397 2 acc_.impl_ptr = self;
398 2 svc_.post(&acc_);
399 2 svc_.work_finished();
400 }
401 }
402
403 240 if (fd_ >= 0)
404 {
405 59 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
406 59 ::close(fd_);
407 59 fd_ = -1;
408 }
409
410 240 local_endpoint_ = endpoint{};
411 240 }
412
413 168 inline select_acceptor_service::select_acceptor_service(
414 168 capy::execution_context& ctx)
415 168 : ctx_(ctx)
416 168 , state_(
417 std::make_unique<select_acceptor_state>(
418 168 ctx.use_service<select_scheduler>()))
419 {
420 168 }
421
422 336 inline select_acceptor_service::~select_acceptor_service() {}
423
424 inline void
425 168 select_acceptor_service::shutdown()
426 {
427 168 std::lock_guard lock(state_->mutex_);
428
429 168 while (auto* impl = state_->acceptor_list_.pop_front())
430 impl->close_socket();
431
432 // Don't clear acceptor_ptrs_ here — same rationale as
433 // select_socket_service::shutdown(). Let ~state_ release ptrs
434 // after scheduler shutdown has drained all queued ops.
435 168 }
436
437 inline io_object::implementation*
438 61 select_acceptor_service::construct()
439 {
440 61 auto impl = std::make_shared<select_acceptor>(*this);
441 61 auto* raw = impl.get();
442
443 61 std::lock_guard lock(state_->mutex_);
444 61 state_->acceptor_list_.push_back(raw);
445 61 state_->acceptor_ptrs_.emplace(raw, std::move(impl));
446
447 61 return raw;
448 61 }
449
450 inline void
451 61 select_acceptor_service::destroy(io_object::implementation* impl)
452 {
453 61 auto* select_impl = static_cast<select_acceptor*>(impl);
454 61 select_impl->close_socket();
455 61 std::lock_guard lock(state_->mutex_);
456 61 state_->acceptor_list_.remove(select_impl);
457 61 state_->acceptor_ptrs_.erase(select_impl);
458 61 }
459
460 inline void
461 120 select_acceptor_service::close(io_object::handle& h)
462 {
463 120 static_cast<select_acceptor*>(h.get())->close_socket();
464 120 }
465
466 inline std::error_code
467 58 select_acceptor::set_option(
468 int level, int optname, void const* data, std::size_t size) noexcept
469 {
470 58 if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) !=
471 0)
472 return make_err(errno);
473 58 return {};
474 }
475
476 inline std::error_code
477 select_acceptor::get_option(
478 int level, int optname, void* data, std::size_t* size) const noexcept
479 {
480 socklen_t len = static_cast<socklen_t>(*size);
481 if (::getsockopt(fd_, level, optname, data, &len) != 0)
482 return make_err(errno);
483 *size = static_cast<std::size_t>(len);
484 return {};
485 }
486
487 inline std::error_code
488 59 select_acceptor_service::open_acceptor_socket(
489 tcp_acceptor::implementation& impl, int family, int type, int protocol)
490 {
491 59 auto* select_impl = static_cast<select_acceptor*>(&impl);
492 59 select_impl->close_socket();
493
494 59 int fd = ::socket(family, type, protocol);
495 59 if (fd < 0)
496 return make_err(errno);
497
498 // Set non-blocking and close-on-exec
499 59 int flags = ::fcntl(fd, F_GETFL, 0);
500 59 if (flags == -1)
501 {
502 int errn = errno;
503 ::close(fd);
504 return make_err(errn);
505 }
506 59 if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
507 {
508 int errn = errno;
509 ::close(fd);
510 return make_err(errn);
511 }
512 59 if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
513 {
514 int errn = errno;
515 ::close(fd);
516 return make_err(errn);
517 }
518
519 59 if (fd >= FD_SETSIZE)
520 {
521 ::close(fd);
522 return make_err(EMFILE);
523 }
524
525 59 if (family == AF_INET6)
526 {
527 8 int val = 0; // dual-stack default
528 8 ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
529 }
530
531 59 select_impl->fd_ = fd;
532 59 return {};
533 }
534
535 inline std::error_code
536 58 select_acceptor_service::bind_acceptor(
537 tcp_acceptor::implementation& impl, endpoint ep)
538 {
539 58 auto* select_impl = static_cast<select_acceptor*>(&impl);
540 58 int fd = select_impl->fd_;
541
542 58 sockaddr_storage storage{};
543 58 socklen_t addrlen = detail::to_sockaddr(ep, storage);
544 58 if (::bind(fd, reinterpret_cast<sockaddr*>(&storage), addrlen) < 0)
545 1 return make_err(errno);
546
547 // Cache local endpoint (resolves ephemeral port)
548 57 sockaddr_storage local{};
549 57 socklen_t local_len = sizeof(local);
550 57 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local), &local_len) == 0)
551 57 select_impl->set_local_endpoint(detail::from_sockaddr(local));
552
553 57 return {};
554 }
555
556 inline std::error_code
557 57 select_acceptor_service::listen_acceptor(
558 tcp_acceptor::implementation& impl, int backlog)
559 {
560 57 auto* select_impl = static_cast<select_acceptor*>(&impl);
561 57 int fd = select_impl->fd_;
562
563 57 if (::listen(fd, backlog) < 0)
564 return make_err(errno);
565
566 57 return {};
567 }
568
569 inline void
570 5 select_acceptor_service::post(select_op* op)
571 {
572 5 state_->sched_.post(op);
573 5 }
574
575 inline void
576 3496 select_acceptor_service::work_started() noexcept
577 {
578 3496 state_->sched_.work_started();
579 3496 }
580
581 inline void
582 3 select_acceptor_service::work_finished() noexcept
583 {
584 3 state_->sched_.work_finished();
585 3 }
586
587 inline select_socket_service*
588 3495 select_acceptor_service::socket_service() const noexcept
589 {
590 3495 auto* svc = ctx_.find_service<detail::socket_service>();
591 3495 return svc ? dynamic_cast<select_socket_service*>(svc) : nullptr;
592 }
593
594 } // namespace boost::corosio::detail
595
596 #endif // BOOST_COROSIO_HAS_SELECT
597
598 #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_ACCEPTOR_SERVICE_HPP
599