TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 : #include <boost/corosio/detail/acceptor_service.hpp>
20 :
21 : #include <boost/corosio/native/detail/epoll/epoll_acceptor.hpp>
22 : #include <boost/corosio/native/detail/epoll/epoll_socket_service.hpp>
23 : #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
24 :
25 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
26 : #include <boost/corosio/detail/dispatch_coro.hpp>
27 : #include <boost/corosio/native/detail/make_err.hpp>
28 :
29 : #include <memory>
30 : #include <mutex>
31 : #include <unordered_map>
32 : #include <utility>
33 :
34 : #include <errno.h>
35 : #include <netinet/in.h>
36 : #include <sys/epoll.h>
37 : #include <sys/socket.h>
38 : #include <unistd.h>
39 :
40 : namespace boost::corosio::detail {
41 :
42 : /** State for epoll acceptor service. */
43 : class epoll_acceptor_state
44 : {
45 : public:
46 HIT 239 : explicit epoll_acceptor_state(epoll_scheduler& sched) noexcept
47 239 : : sched_(sched)
48 : {
49 239 : }
50 :
51 : epoll_scheduler& sched_;
52 : std::mutex mutex_;
53 : intrusive_list<epoll_acceptor> acceptor_list_;
54 : std::unordered_map<epoll_acceptor*, std::shared_ptr<epoll_acceptor>>
55 : acceptor_ptrs_;
56 : };
57 :
58 : /** epoll acceptor service implementation.
59 :
60 : Inherits from acceptor_service to enable runtime polymorphism.
61 : Uses key_type = acceptor_service for service lookup.
62 : */
63 : class BOOST_COROSIO_DECL epoll_acceptor_service final : public acceptor_service
64 : {
65 : public:
66 : explicit epoll_acceptor_service(capy::execution_context& ctx);
67 : ~epoll_acceptor_service() override;
68 :
69 : epoll_acceptor_service(epoll_acceptor_service const&) = delete;
70 : epoll_acceptor_service& operator=(epoll_acceptor_service const&) = delete;
71 :
72 : void shutdown() override;
73 :
74 : io_object::implementation* construct() override;
75 : void destroy(io_object::implementation*) override;
76 : void close(io_object::handle&) override;
77 : std::error_code open_acceptor_socket(
78 : tcp_acceptor::implementation& impl,
79 : int family,
80 : int type,
81 : int protocol) override;
82 : std::error_code
83 : bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
84 : std::error_code
85 : listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
86 :
87 4898 : epoll_scheduler& scheduler() const noexcept
88 : {
89 4898 : return state_->sched_;
90 : }
91 : void post(epoll_op* op);
92 : void work_started() noexcept;
93 : void work_finished() noexcept;
94 :
95 : /** Get the socket service for creating peer sockets during accept. */
96 : epoll_socket_service* socket_service() const noexcept;
97 :
98 : private:
99 : capy::execution_context& ctx_;
100 : std::unique_ptr<epoll_acceptor_state> state_;
101 : };
102 :
103 : //--------------------------------------------------------------------------
104 : //
105 : // Implementation
106 : //
107 : //--------------------------------------------------------------------------
108 :
109 : inline void
110 6 : epoll_accept_op::cancel() noexcept
111 : {
112 6 : if (acceptor_impl_)
113 6 : acceptor_impl_->cancel_single_op(*this);
114 : else
115 MIS 0 : request_cancel();
116 HIT 6 : }
117 :
118 : inline void
119 4742 : epoll_accept_op::operator()()
120 : {
121 4742 : stop_cb.reset();
122 :
123 4742 : static_cast<epoll_acceptor*>(acceptor_impl_)
124 4742 : ->service()
125 4742 : .scheduler()
126 4742 : .reset_inline_budget();
127 :
128 4742 : bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
129 :
130 4742 : if (cancelled.load(std::memory_order_acquire))
131 9 : *ec_out = capy::error::canceled;
132 4733 : else if (errn != 0)
133 MIS 0 : *ec_out = make_err(errn);
134 : else
135 HIT 4733 : *ec_out = {};
136 :
137 : // Set up the peer socket on success
138 4742 : if (success && accepted_fd >= 0 && acceptor_impl_)
139 : {
140 4733 : auto* socket_svc = static_cast<epoll_acceptor*>(acceptor_impl_)
141 4733 : ->service()
142 4733 : .socket_service();
143 4733 : if (socket_svc)
144 : {
145 4733 : auto& impl = static_cast<epoll_socket&>(*socket_svc->construct());
146 4733 : impl.set_socket(accepted_fd);
147 :
148 4733 : impl.desc_state_.fd = accepted_fd;
149 : {
150 4733 : std::lock_guard lock(impl.desc_state_.mutex);
151 4733 : impl.desc_state_.read_op = nullptr;
152 4733 : impl.desc_state_.write_op = nullptr;
153 4733 : impl.desc_state_.connect_op = nullptr;
154 4733 : }
155 4733 : socket_svc->scheduler().register_descriptor(
156 : accepted_fd, &impl.desc_state_);
157 :
158 4733 : impl.set_endpoints(
159 4733 : static_cast<epoll_acceptor*>(acceptor_impl_)->local_endpoint(),
160 4733 : from_sockaddr(peer_storage));
161 :
162 4733 : if (impl_out)
163 4733 : *impl_out = &impl;
164 4733 : accepted_fd = -1;
165 : }
166 : else
167 : {
168 : // No socket service — treat as error
169 MIS 0 : *ec_out = make_err(ENOENT);
170 0 : success = false;
171 : }
172 : }
173 :
174 HIT 4742 : if (!success || !acceptor_impl_)
175 : {
176 9 : if (accepted_fd >= 0)
177 : {
178 MIS 0 : ::close(accepted_fd);
179 0 : accepted_fd = -1;
180 : }
181 HIT 9 : if (impl_out)
182 9 : *impl_out = nullptr;
183 : }
184 :
185 : // Move to stack before resuming. See epoll_op::operator()() for rationale.
186 4742 : capy::executor_ref saved_ex(ex);
187 4742 : std::coroutine_handle<> saved_h(h);
188 4742 : auto prevent_premature_destruction = std::move(impl_ptr);
189 4742 : dispatch_coro(saved_ex, saved_h).resume();
190 4742 : }
191 :
192 82 : inline epoll_acceptor::epoll_acceptor(epoll_acceptor_service& svc) noexcept
193 82 : : svc_(svc)
194 : {
195 82 : }
196 :
197 : inline std::coroutine_handle<>
198 4742 : epoll_acceptor::accept(
199 : std::coroutine_handle<> h,
200 : capy::executor_ref ex,
201 : std::stop_token token,
202 : std::error_code* ec,
203 : io_object::implementation** impl_out)
204 : {
205 4742 : auto& op = acc_;
206 4742 : op.reset();
207 4742 : op.h = h;
208 4742 : op.ex = ex;
209 4742 : op.ec_out = ec;
210 4742 : op.impl_out = impl_out;
211 4742 : op.fd = fd_;
212 4742 : op.start(token, this);
213 :
214 4742 : sockaddr_storage peer_storage{};
215 4742 : socklen_t addrlen = sizeof(peer_storage);
216 : int accepted;
217 : do
218 : {
219 4742 : accepted = ::accept4(
220 : fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen,
221 : SOCK_NONBLOCK | SOCK_CLOEXEC);
222 : }
223 4742 : while (accepted < 0 && errno == EINTR);
224 :
225 4742 : if (accepted >= 0)
226 : {
227 : {
228 2 : std::lock_guard lock(desc_state_.mutex);
229 2 : desc_state_.read_ready = false;
230 2 : }
231 :
232 2 : if (svc_.scheduler().try_consume_inline_budget())
233 : {
234 MIS 0 : auto* socket_svc = svc_.socket_service();
235 0 : if (socket_svc)
236 : {
237 : auto& impl =
238 0 : static_cast<epoll_socket&>(*socket_svc->construct());
239 0 : impl.set_socket(accepted);
240 :
241 0 : impl.desc_state_.fd = accepted;
242 : {
243 0 : std::lock_guard lock(impl.desc_state_.mutex);
244 0 : impl.desc_state_.read_op = nullptr;
245 0 : impl.desc_state_.write_op = nullptr;
246 0 : impl.desc_state_.connect_op = nullptr;
247 0 : }
248 0 : socket_svc->scheduler().register_descriptor(
249 : accepted, &impl.desc_state_);
250 :
251 0 : impl.set_endpoints(
252 : local_endpoint_, from_sockaddr(peer_storage));
253 :
254 0 : *ec = {};
255 0 : if (impl_out)
256 0 : *impl_out = &impl;
257 : }
258 : else
259 : {
260 0 : ::close(accepted);
261 0 : *ec = make_err(ENOENT);
262 0 : if (impl_out)
263 0 : *impl_out = nullptr;
264 : }
265 0 : return dispatch_coro(ex, h);
266 : }
267 :
268 HIT 2 : op.accepted_fd = accepted;
269 2 : op.peer_storage = peer_storage;
270 2 : op.complete(0, 0);
271 2 : op.impl_ptr = shared_from_this();
272 2 : svc_.post(&op);
273 2 : return std::noop_coroutine();
274 : }
275 :
276 4740 : if (errno == EAGAIN || errno == EWOULDBLOCK)
277 : {
278 4740 : op.impl_ptr = shared_from_this();
279 4740 : svc_.work_started();
280 :
281 4740 : std::lock_guard lock(desc_state_.mutex);
282 4740 : bool io_done = false;
283 4740 : if (desc_state_.read_ready)
284 : {
285 MIS 0 : desc_state_.read_ready = false;
286 0 : op.perform_io();
287 0 : io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
288 0 : if (!io_done)
289 0 : op.errn = 0;
290 : }
291 :
292 HIT 4740 : if (io_done || op.cancelled.load(std::memory_order_acquire))
293 : {
294 MIS 0 : svc_.post(&op);
295 0 : svc_.work_finished();
296 : }
297 : else
298 : {
299 HIT 4740 : desc_state_.read_op = &op;
300 : }
301 4740 : return std::noop_coroutine();
302 4740 : }
303 :
304 MIS 0 : op.complete(errno, 0);
305 0 : op.impl_ptr = shared_from_this();
306 0 : svc_.post(&op);
307 : // completion is always posted to scheduler queue, never inline.
308 0 : return std::noop_coroutine();
309 : }
310 :
311 : inline void
312 HIT 2 : epoll_acceptor::cancel() noexcept
313 : {
314 2 : cancel_single_op(acc_);
315 2 : }
316 :
317 : inline void
318 8 : epoll_acceptor::cancel_single_op(epoll_op& op) noexcept
319 : {
320 8 : auto self = weak_from_this().lock();
321 8 : if (!self)
322 MIS 0 : return;
323 :
324 HIT 8 : op.request_cancel();
325 :
326 8 : epoll_op* claimed = nullptr;
327 : {
328 8 : std::lock_guard lock(desc_state_.mutex);
329 8 : if (desc_state_.read_op == &op)
330 7 : claimed = std::exchange(desc_state_.read_op, nullptr);
331 8 : }
332 8 : if (claimed)
333 : {
334 7 : op.impl_ptr = self;
335 7 : svc_.post(&op);
336 7 : svc_.work_finished();
337 : }
338 8 : }
339 :
340 : inline void
341 326 : epoll_acceptor::close_socket() noexcept
342 : {
343 326 : auto self = weak_from_this().lock();
344 326 : if (self)
345 : {
346 326 : acc_.request_cancel();
347 :
348 326 : epoll_op* claimed = nullptr;
349 : {
350 326 : std::lock_guard lock(desc_state_.mutex);
351 326 : claimed = std::exchange(desc_state_.read_op, nullptr);
352 326 : desc_state_.read_ready = false;
353 326 : desc_state_.write_ready = false;
354 326 : }
355 :
356 326 : if (claimed)
357 : {
358 2 : acc_.impl_ptr = self;
359 2 : svc_.post(&acc_);
360 2 : svc_.work_finished();
361 : }
362 :
363 326 : if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
364 MIS 0 : desc_state_.impl_ref_ = self;
365 : }
366 :
367 HIT 326 : if (fd_ >= 0)
368 : {
369 81 : if (desc_state_.registered_events != 0)
370 77 : svc_.scheduler().deregister_descriptor(fd_);
371 81 : ::close(fd_);
372 81 : fd_ = -1;
373 : }
374 :
375 326 : desc_state_.fd = -1;
376 326 : desc_state_.registered_events = 0;
377 :
378 326 : local_endpoint_ = endpoint{};
379 326 : }
380 :
381 239 : inline epoll_acceptor_service::epoll_acceptor_service(
382 239 : capy::execution_context& ctx)
383 239 : : ctx_(ctx)
384 239 : , state_(
385 : std::make_unique<epoll_acceptor_state>(
386 239 : ctx.use_service<epoll_scheduler>()))
387 : {
388 239 : }
389 :
390 478 : inline epoll_acceptor_service::~epoll_acceptor_service() {}
391 :
392 : inline void
393 239 : epoll_acceptor_service::shutdown()
394 : {
395 239 : std::lock_guard lock(state_->mutex_);
396 :
397 239 : while (auto* impl = state_->acceptor_list_.pop_front())
398 MIS 0 : impl->close_socket();
399 :
400 : // Don't clear acceptor_ptrs_ here — same rationale as
401 : // epoll_socket_service::shutdown(). Let ~state_ release ptrs
402 : // after scheduler shutdown has drained all queued ops.
403 HIT 239 : }
404 :
405 : inline io_object::implementation*
406 82 : epoll_acceptor_service::construct()
407 : {
408 82 : auto impl = std::make_shared<epoll_acceptor>(*this);
409 82 : auto* raw = impl.get();
410 :
411 82 : std::lock_guard lock(state_->mutex_);
412 82 : state_->acceptor_list_.push_back(raw);
413 82 : state_->acceptor_ptrs_.emplace(raw, std::move(impl));
414 :
415 82 : return raw;
416 82 : }
417 :
418 : inline void
419 82 : epoll_acceptor_service::destroy(io_object::implementation* impl)
420 : {
421 82 : auto* epoll_impl = static_cast<epoll_acceptor*>(impl);
422 82 : epoll_impl->close_socket();
423 82 : std::lock_guard lock(state_->mutex_);
424 82 : state_->acceptor_list_.remove(epoll_impl);
425 82 : state_->acceptor_ptrs_.erase(epoll_impl);
426 82 : }
427 :
428 : inline void
429 163 : epoll_acceptor_service::close(io_object::handle& h)
430 : {
431 163 : static_cast<epoll_acceptor*>(h.get())->close_socket();
432 163 : }
433 :
434 : inline std::error_code
435 79 : epoll_acceptor::set_option(
436 : int level, int optname, void const* data, std::size_t size) noexcept
437 : {
438 79 : if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) !=
439 : 0)
440 MIS 0 : return make_err(errno);
441 HIT 79 : return {};
442 : }
443 :
444 : inline std::error_code
445 MIS 0 : epoll_acceptor::get_option(
446 : int level, int optname, void* data, std::size_t* size) const noexcept
447 : {
448 0 : socklen_t len = static_cast<socklen_t>(*size);
449 0 : if (::getsockopt(fd_, level, optname, data, &len) != 0)
450 0 : return make_err(errno);
451 0 : *size = static_cast<std::size_t>(len);
452 0 : return {};
453 : }
454 :
455 : inline std::error_code
456 HIT 81 : epoll_acceptor_service::open_acceptor_socket(
457 : tcp_acceptor::implementation& impl, int family, int type, int protocol)
458 : {
459 81 : auto* epoll_impl = static_cast<epoll_acceptor*>(&impl);
460 81 : epoll_impl->close_socket();
461 :
462 81 : int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
463 81 : if (fd < 0)
464 MIS 0 : return make_err(errno);
465 :
466 HIT 81 : if (family == AF_INET6)
467 : {
468 8 : int val = 0; // dual-stack default
469 8 : ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
470 : }
471 :
472 81 : epoll_impl->fd_ = fd;
473 :
474 : // Set up descriptor state but do NOT register with epoll yet
475 81 : epoll_impl->desc_state_.fd = fd;
476 : {
477 81 : std::lock_guard lock(epoll_impl->desc_state_.mutex);
478 81 : epoll_impl->desc_state_.read_op = nullptr;
479 81 : }
480 :
481 81 : return {};
482 : }
483 :
484 : inline std::error_code
485 80 : epoll_acceptor_service::bind_acceptor(
486 : tcp_acceptor::implementation& impl, endpoint ep)
487 : {
488 80 : auto* epoll_impl = static_cast<epoll_acceptor*>(&impl);
489 80 : int fd = epoll_impl->fd_;
490 :
491 80 : sockaddr_storage storage{};
492 80 : socklen_t addrlen = detail::to_sockaddr(ep, storage);
493 80 : if (::bind(fd, reinterpret_cast<sockaddr*>(&storage), addrlen) < 0)
494 3 : return make_err(errno);
495 :
496 : // Cache local endpoint (resolves ephemeral port)
497 77 : sockaddr_storage local{};
498 77 : socklen_t local_len = sizeof(local);
499 77 : if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local), &local_len) == 0)
500 77 : epoll_impl->set_local_endpoint(detail::from_sockaddr(local));
501 :
502 77 : return {};
503 : }
504 :
505 : inline std::error_code
506 77 : epoll_acceptor_service::listen_acceptor(
507 : tcp_acceptor::implementation& impl, int backlog)
508 : {
509 77 : auto* epoll_impl = static_cast<epoll_acceptor*>(&impl);
510 77 : int fd = epoll_impl->fd_;
511 :
512 77 : if (::listen(fd, backlog) < 0)
513 MIS 0 : return make_err(errno);
514 :
515 : // Register fd with epoll (edge-triggered mode)
516 HIT 77 : scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
517 :
518 77 : return {};
519 : }
520 :
521 : inline void
522 11 : epoll_acceptor_service::post(epoll_op* op)
523 : {
524 11 : state_->sched_.post(op);
525 11 : }
526 :
527 : inline void
528 4740 : epoll_acceptor_service::work_started() noexcept
529 : {
530 4740 : state_->sched_.work_started();
531 4740 : }
532 :
533 : inline void
534 9 : epoll_acceptor_service::work_finished() noexcept
535 : {
536 9 : state_->sched_.work_finished();
537 9 : }
538 :
539 : inline epoll_socket_service*
540 4733 : epoll_acceptor_service::socket_service() const noexcept
541 : {
542 4733 : auto* svc = ctx_.find_service<detail::socket_service>();
543 4733 : return svc ? dynamic_cast<epoll_socket_service*>(svc) : nullptr;
544 : }
545 :
546 : } // namespace boost::corosio::detail
547 :
548 : #endif // BOOST_COROSIO_HAS_EPOLL
549 :
550 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
|