LCOV - code coverage report
Current view: top level - corosio/native/detail/epoll - epoll_acceptor_service.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 80.6 % 252 203 49
Test Date: 2026-02-27 19:39:18 Functions: 96.0 % 25 24 1

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_HAS_EPOLL
      16                 : 
      17                 : #include <boost/corosio/detail/config.hpp>
      18                 : #include <boost/capy/ex/execution_context.hpp>
      19                 : #include <boost/corosio/detail/acceptor_service.hpp>
      20                 : 
      21                 : #include <boost/corosio/native/detail/epoll/epoll_acceptor.hpp>
      22                 : #include <boost/corosio/native/detail/epoll/epoll_socket_service.hpp>
      23                 : #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
      24                 : 
      25                 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
      26                 : #include <boost/corosio/detail/dispatch_coro.hpp>
      27                 : #include <boost/corosio/native/detail/make_err.hpp>
      28                 : 
      29                 : #include <memory>
      30                 : #include <mutex>
      31                 : #include <unordered_map>
      32                 : #include <utility>
      33                 : 
      34                 : #include <errno.h>
      35                 : #include <netinet/in.h>
      36                 : #include <sys/epoll.h>
      37                 : #include <sys/socket.h>
      38                 : #include <unistd.h>
      39                 : 
      40                 : namespace boost::corosio::detail {
      41                 : 
      42                 : /** State for epoll acceptor service. */
      43                 : class epoll_acceptor_state
      44                 : {
      45                 : public:
      46 HIT         239 :     explicit epoll_acceptor_state(epoll_scheduler& sched) noexcept
      47             239 :         : sched_(sched)
      48                 :     {
      49             239 :     }
      50                 : 
      51                 :     epoll_scheduler& sched_;
      52                 :     std::mutex mutex_;
      53                 :     intrusive_list<epoll_acceptor> acceptor_list_;
      54                 :     std::unordered_map<epoll_acceptor*, std::shared_ptr<epoll_acceptor>>
      55                 :         acceptor_ptrs_;
      56                 : };
      57                 : 
      58                 : /** epoll acceptor service implementation.
      59                 : 
      60                 :     Inherits from acceptor_service to enable runtime polymorphism.
      61                 :     Uses key_type = acceptor_service for service lookup.
      62                 : */
      63                 : class BOOST_COROSIO_DECL epoll_acceptor_service final : public acceptor_service
      64                 : {
      65                 : public:
      66                 :     explicit epoll_acceptor_service(capy::execution_context& ctx);
      67                 :     ~epoll_acceptor_service() override;
      68                 : 
      69                 :     epoll_acceptor_service(epoll_acceptor_service const&)            = delete;
      70                 :     epoll_acceptor_service& operator=(epoll_acceptor_service const&) = delete;
      71                 : 
      72                 :     void shutdown() override;
      73                 : 
      74                 :     io_object::implementation* construct() override;
      75                 :     void destroy(io_object::implementation*) override;
      76                 :     void close(io_object::handle&) override;
      77                 :     std::error_code open_acceptor_socket(
      78                 :         tcp_acceptor::implementation& impl,
      79                 :         int family,
      80                 :         int type,
      81                 :         int protocol) override;
      82                 :     std::error_code
      83                 :     bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
      84                 :     std::error_code
      85                 :     listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
      86                 : 
      87            4898 :     epoll_scheduler& scheduler() const noexcept
      88                 :     {
      89            4898 :         return state_->sched_;
      90                 :     }
      91                 :     void post(epoll_op* op);
      92                 :     void work_started() noexcept;
      93                 :     void work_finished() noexcept;
      94                 : 
      95                 :     /** Get the socket service for creating peer sockets during accept. */
      96                 :     epoll_socket_service* socket_service() const noexcept;
      97                 : 
      98                 : private:
      99                 :     capy::execution_context& ctx_;
     100                 :     std::unique_ptr<epoll_acceptor_state> state_;
     101                 : };
     102                 : 
     103                 : //--------------------------------------------------------------------------
     104                 : //
     105                 : // Implementation
     106                 : //
     107                 : //--------------------------------------------------------------------------
     108                 : 
     109                 : inline void
     110               6 : epoll_accept_op::cancel() noexcept
     111                 : {
     112               6 :     if (acceptor_impl_)
     113               6 :         acceptor_impl_->cancel_single_op(*this);
     114                 :     else
     115 MIS           0 :         request_cancel();
     116 HIT           6 : }
     117                 : 
     118                 : inline void
     119            4742 : epoll_accept_op::operator()()
     120                 : {
     121            4742 :     stop_cb.reset();
     122                 : 
     123            4742 :     static_cast<epoll_acceptor*>(acceptor_impl_)
     124            4742 :         ->service()
     125            4742 :         .scheduler()
     126            4742 :         .reset_inline_budget();
     127                 : 
     128            4742 :     bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
     129                 : 
     130            4742 :     if (cancelled.load(std::memory_order_acquire))
     131               9 :         *ec_out = capy::error::canceled;
     132            4733 :     else if (errn != 0)
     133 MIS           0 :         *ec_out = make_err(errn);
     134                 :     else
     135 HIT        4733 :         *ec_out = {};
     136                 : 
     137                 :     // Set up the peer socket on success
     138            4742 :     if (success && accepted_fd >= 0 && acceptor_impl_)
     139                 :     {
     140            4733 :         auto* socket_svc = static_cast<epoll_acceptor*>(acceptor_impl_)
     141            4733 :                                ->service()
     142            4733 :                                .socket_service();
     143            4733 :         if (socket_svc)
     144                 :         {
     145            4733 :             auto& impl = static_cast<epoll_socket&>(*socket_svc->construct());
     146            4733 :             impl.set_socket(accepted_fd);
     147                 : 
     148            4733 :             impl.desc_state_.fd = accepted_fd;
     149                 :             {
     150            4733 :                 std::lock_guard lock(impl.desc_state_.mutex);
     151            4733 :                 impl.desc_state_.read_op    = nullptr;
     152            4733 :                 impl.desc_state_.write_op   = nullptr;
     153            4733 :                 impl.desc_state_.connect_op = nullptr;
     154            4733 :             }
     155            4733 :             socket_svc->scheduler().register_descriptor(
     156                 :                 accepted_fd, &impl.desc_state_);
     157                 : 
     158            4733 :             impl.set_endpoints(
     159            4733 :                 static_cast<epoll_acceptor*>(acceptor_impl_)->local_endpoint(),
     160            4733 :                 from_sockaddr(peer_storage));
     161                 : 
     162            4733 :             if (impl_out)
     163            4733 :                 *impl_out = &impl;
     164            4733 :             accepted_fd = -1;
     165                 :         }
     166                 :         else
     167                 :         {
     168                 :             // No socket service — treat as error
     169 MIS           0 :             *ec_out = make_err(ENOENT);
     170               0 :             success = false;
     171                 :         }
     172                 :     }
     173                 : 
     174 HIT        4742 :     if (!success || !acceptor_impl_)
     175                 :     {
     176               9 :         if (accepted_fd >= 0)
     177                 :         {
     178 MIS           0 :             ::close(accepted_fd);
     179               0 :             accepted_fd = -1;
     180                 :         }
     181 HIT           9 :         if (impl_out)
     182               9 :             *impl_out = nullptr;
     183                 :     }
     184                 : 
     185                 :     // Move to stack before resuming. See epoll_op::operator()() for rationale.
     186            4742 :     capy::executor_ref saved_ex(ex);
     187            4742 :     std::coroutine_handle<> saved_h(h);
     188            4742 :     auto prevent_premature_destruction = std::move(impl_ptr);
     189            4742 :     dispatch_coro(saved_ex, saved_h).resume();
     190            4742 : }
     191                 : 
     192              82 : inline epoll_acceptor::epoll_acceptor(epoll_acceptor_service& svc) noexcept
     193              82 :     : svc_(svc)
     194                 : {
     195              82 : }
     196                 : 
     197                 : inline std::coroutine_handle<>
     198            4742 : epoll_acceptor::accept(
     199                 :     std::coroutine_handle<> h,
     200                 :     capy::executor_ref ex,
     201                 :     std::stop_token token,
     202                 :     std::error_code* ec,
     203                 :     io_object::implementation** impl_out)
     204                 : {
     205            4742 :     auto& op = acc_;
     206            4742 :     op.reset();
     207            4742 :     op.h        = h;
     208            4742 :     op.ex       = ex;
     209            4742 :     op.ec_out   = ec;
     210            4742 :     op.impl_out = impl_out;
     211            4742 :     op.fd       = fd_;
     212            4742 :     op.start(token, this);
     213                 : 
     214            4742 :     sockaddr_storage peer_storage{};
     215            4742 :     socklen_t addrlen = sizeof(peer_storage);
     216                 :     int accepted;
     217                 :     do
     218                 :     {
     219            4742 :         accepted = ::accept4(
     220                 :             fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen,
     221                 :             SOCK_NONBLOCK | SOCK_CLOEXEC);
     222                 :     }
     223            4742 :     while (accepted < 0 && errno == EINTR);
     224                 : 
     225            4742 :     if (accepted >= 0)
     226                 :     {
     227                 :         {
     228               2 :             std::lock_guard lock(desc_state_.mutex);
     229               2 :             desc_state_.read_ready = false;
     230               2 :         }
     231                 : 
     232               2 :         if (svc_.scheduler().try_consume_inline_budget())
     233                 :         {
     234 MIS           0 :             auto* socket_svc = svc_.socket_service();
     235               0 :             if (socket_svc)
     236                 :             {
     237                 :                 auto& impl =
     238               0 :                     static_cast<epoll_socket&>(*socket_svc->construct());
     239               0 :                 impl.set_socket(accepted);
     240                 : 
     241               0 :                 impl.desc_state_.fd = accepted;
     242                 :                 {
     243               0 :                     std::lock_guard lock(impl.desc_state_.mutex);
     244               0 :                     impl.desc_state_.read_op    = nullptr;
     245               0 :                     impl.desc_state_.write_op   = nullptr;
     246               0 :                     impl.desc_state_.connect_op = nullptr;
     247               0 :                 }
     248               0 :                 socket_svc->scheduler().register_descriptor(
     249                 :                     accepted, &impl.desc_state_);
     250                 : 
     251               0 :                 impl.set_endpoints(
     252                 :                     local_endpoint_, from_sockaddr(peer_storage));
     253                 : 
     254               0 :                 *ec = {};
     255               0 :                 if (impl_out)
     256               0 :                     *impl_out = &impl;
     257                 :             }
     258                 :             else
     259                 :             {
     260               0 :                 ::close(accepted);
     261               0 :                 *ec = make_err(ENOENT);
     262               0 :                 if (impl_out)
     263               0 :                     *impl_out = nullptr;
     264                 :             }
     265               0 :             return dispatch_coro(ex, h);
     266                 :         }
     267                 : 
     268 HIT           2 :         op.accepted_fd  = accepted;
     269               2 :         op.peer_storage = peer_storage;
     270               2 :         op.complete(0, 0);
     271               2 :         op.impl_ptr = shared_from_this();
     272               2 :         svc_.post(&op);
     273               2 :         return std::noop_coroutine();
     274                 :     }
     275                 : 
     276            4740 :     if (errno == EAGAIN || errno == EWOULDBLOCK)
     277                 :     {
     278            4740 :         op.impl_ptr = shared_from_this();
     279            4740 :         svc_.work_started();
     280                 : 
     281            4740 :         std::lock_guard lock(desc_state_.mutex);
     282            4740 :         bool io_done = false;
     283            4740 :         if (desc_state_.read_ready)
     284                 :         {
     285 MIS           0 :             desc_state_.read_ready = false;
     286               0 :             op.perform_io();
     287               0 :             io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
     288               0 :             if (!io_done)
     289               0 :                 op.errn = 0;
     290                 :         }
     291                 : 
     292 HIT        4740 :         if (io_done || op.cancelled.load(std::memory_order_acquire))
     293                 :         {
     294 MIS           0 :             svc_.post(&op);
     295               0 :             svc_.work_finished();
     296                 :         }
     297                 :         else
     298                 :         {
     299 HIT        4740 :             desc_state_.read_op = &op;
     300                 :         }
     301            4740 :         return std::noop_coroutine();
     302            4740 :     }
     303                 : 
     304 MIS           0 :     op.complete(errno, 0);
     305               0 :     op.impl_ptr = shared_from_this();
     306               0 :     svc_.post(&op);
     307                 :     // completion is always posted to scheduler queue, never inline.
     308               0 :     return std::noop_coroutine();
     309                 : }
     310                 : 
     311                 : inline void
     312 HIT           2 : epoll_acceptor::cancel() noexcept
     313                 : {
     314               2 :     cancel_single_op(acc_);
     315               2 : }
     316                 : 
     317                 : inline void
     318               8 : epoll_acceptor::cancel_single_op(epoll_op& op) noexcept
     319                 : {
     320               8 :     auto self = weak_from_this().lock();
     321               8 :     if (!self)
     322 MIS           0 :         return;
     323                 : 
     324 HIT           8 :     op.request_cancel();
     325                 : 
     326               8 :     epoll_op* claimed = nullptr;
     327                 :     {
     328               8 :         std::lock_guard lock(desc_state_.mutex);
     329               8 :         if (desc_state_.read_op == &op)
     330               7 :             claimed = std::exchange(desc_state_.read_op, nullptr);
     331               8 :     }
     332               8 :     if (claimed)
     333                 :     {
     334               7 :         op.impl_ptr = self;
     335               7 :         svc_.post(&op);
     336               7 :         svc_.work_finished();
     337                 :     }
     338               8 : }
     339                 : 
     340                 : inline void
     341             326 : epoll_acceptor::close_socket() noexcept
     342                 : {
     343             326 :     auto self = weak_from_this().lock();
     344             326 :     if (self)
     345                 :     {
     346             326 :         acc_.request_cancel();
     347                 : 
     348             326 :         epoll_op* claimed = nullptr;
     349                 :         {
     350             326 :             std::lock_guard lock(desc_state_.mutex);
     351             326 :             claimed = std::exchange(desc_state_.read_op, nullptr);
     352             326 :             desc_state_.read_ready  = false;
     353             326 :             desc_state_.write_ready = false;
     354             326 :         }
     355                 : 
     356             326 :         if (claimed)
     357                 :         {
     358               2 :             acc_.impl_ptr = self;
     359               2 :             svc_.post(&acc_);
     360               2 :             svc_.work_finished();
     361                 :         }
     362                 : 
     363             326 :         if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
     364 MIS           0 :             desc_state_.impl_ref_ = self;
     365                 :     }
     366                 : 
     367 HIT         326 :     if (fd_ >= 0)
     368                 :     {
     369              81 :         if (desc_state_.registered_events != 0)
     370              77 :             svc_.scheduler().deregister_descriptor(fd_);
     371              81 :         ::close(fd_);
     372              81 :         fd_ = -1;
     373                 :     }
     374                 : 
     375             326 :     desc_state_.fd                = -1;
     376             326 :     desc_state_.registered_events = 0;
     377                 : 
     378             326 :     local_endpoint_ = endpoint{};
     379             326 : }
     380                 : 
     381             239 : inline epoll_acceptor_service::epoll_acceptor_service(
     382             239 :     capy::execution_context& ctx)
     383             239 :     : ctx_(ctx)
     384             239 :     , state_(
     385                 :           std::make_unique<epoll_acceptor_state>(
     386             239 :               ctx.use_service<epoll_scheduler>()))
     387                 : {
     388             239 : }
     389                 : 
     390             478 : inline epoll_acceptor_service::~epoll_acceptor_service() {}
     391                 : 
     392                 : inline void
     393             239 : epoll_acceptor_service::shutdown()
     394                 : {
     395             239 :     std::lock_guard lock(state_->mutex_);
     396                 : 
     397             239 :     while (auto* impl = state_->acceptor_list_.pop_front())
     398 MIS           0 :         impl->close_socket();
     399                 : 
     400                 :     // Don't clear acceptor_ptrs_ here — same rationale as
     401                 :     // epoll_socket_service::shutdown(). Let ~state_ release ptrs
     402                 :     // after scheduler shutdown has drained all queued ops.
     403 HIT         239 : }
     404                 : 
     405                 : inline io_object::implementation*
     406              82 : epoll_acceptor_service::construct()
     407                 : {
     408              82 :     auto impl = std::make_shared<epoll_acceptor>(*this);
     409              82 :     auto* raw = impl.get();
     410                 : 
     411              82 :     std::lock_guard lock(state_->mutex_);
     412              82 :     state_->acceptor_list_.push_back(raw);
     413              82 :     state_->acceptor_ptrs_.emplace(raw, std::move(impl));
     414                 : 
     415              82 :     return raw;
     416              82 : }
     417                 : 
     418                 : inline void
     419              82 : epoll_acceptor_service::destroy(io_object::implementation* impl)
     420                 : {
     421              82 :     auto* epoll_impl = static_cast<epoll_acceptor*>(impl);
     422              82 :     epoll_impl->close_socket();
     423              82 :     std::lock_guard lock(state_->mutex_);
     424              82 :     state_->acceptor_list_.remove(epoll_impl);
     425              82 :     state_->acceptor_ptrs_.erase(epoll_impl);
     426              82 : }
     427                 : 
     428                 : inline void
     429             163 : epoll_acceptor_service::close(io_object::handle& h)
     430                 : {
     431             163 :     static_cast<epoll_acceptor*>(h.get())->close_socket();
     432             163 : }
     433                 : 
     434                 : inline std::error_code
     435              79 : epoll_acceptor::set_option(
     436                 :     int level, int optname, void const* data, std::size_t size) noexcept
     437                 : {
     438              79 :     if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) !=
     439                 :         0)
     440 MIS           0 :         return make_err(errno);
     441 HIT          79 :     return {};
     442                 : }
     443                 : 
     444                 : inline std::error_code
     445 MIS           0 : epoll_acceptor::get_option(
     446                 :     int level, int optname, void* data, std::size_t* size) const noexcept
     447                 : {
     448               0 :     socklen_t len = static_cast<socklen_t>(*size);
     449               0 :     if (::getsockopt(fd_, level, optname, data, &len) != 0)
     450               0 :         return make_err(errno);
     451               0 :     *size = static_cast<std::size_t>(len);
     452               0 :     return {};
     453                 : }
     454                 : 
     455                 : inline std::error_code
     456 HIT          81 : epoll_acceptor_service::open_acceptor_socket(
     457                 :     tcp_acceptor::implementation& impl, int family, int type, int protocol)
     458                 : {
     459              81 :     auto* epoll_impl = static_cast<epoll_acceptor*>(&impl);
     460              81 :     epoll_impl->close_socket();
     461                 : 
     462              81 :     int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
     463              81 :     if (fd < 0)
     464 MIS           0 :         return make_err(errno);
     465                 : 
     466 HIT          81 :     if (family == AF_INET6)
     467                 :     {
     468               8 :         int val = 0; // dual-stack default
     469               8 :         ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
     470                 :     }
     471                 : 
     472              81 :     epoll_impl->fd_ = fd;
     473                 : 
     474                 :     // Set up descriptor state but do NOT register with epoll yet
     475              81 :     epoll_impl->desc_state_.fd = fd;
     476                 :     {
     477              81 :         std::lock_guard lock(epoll_impl->desc_state_.mutex);
     478              81 :         epoll_impl->desc_state_.read_op = nullptr;
     479              81 :     }
     480                 : 
     481              81 :     return {};
     482                 : }
     483                 : 
     484                 : inline std::error_code
     485              80 : epoll_acceptor_service::bind_acceptor(
     486                 :     tcp_acceptor::implementation& impl, endpoint ep)
     487                 : {
     488              80 :     auto* epoll_impl = static_cast<epoll_acceptor*>(&impl);
     489              80 :     int fd           = epoll_impl->fd_;
     490                 : 
     491              80 :     sockaddr_storage storage{};
     492              80 :     socklen_t addrlen = detail::to_sockaddr(ep, storage);
     493              80 :     if (::bind(fd, reinterpret_cast<sockaddr*>(&storage), addrlen) < 0)
     494               3 :         return make_err(errno);
     495                 : 
     496                 :     // Cache local endpoint (resolves ephemeral port)
     497              77 :     sockaddr_storage local{};
     498              77 :     socklen_t local_len = sizeof(local);
     499              77 :     if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local), &local_len) == 0)
     500              77 :         epoll_impl->set_local_endpoint(detail::from_sockaddr(local));
     501                 : 
     502              77 :     return {};
     503                 : }
     504                 : 
     505                 : inline std::error_code
     506              77 : epoll_acceptor_service::listen_acceptor(
     507                 :     tcp_acceptor::implementation& impl, int backlog)
     508                 : {
     509              77 :     auto* epoll_impl = static_cast<epoll_acceptor*>(&impl);
     510              77 :     int fd           = epoll_impl->fd_;
     511                 : 
     512              77 :     if (::listen(fd, backlog) < 0)
     513 MIS           0 :         return make_err(errno);
     514                 : 
     515                 :     // Register fd with epoll (edge-triggered mode)
     516 HIT          77 :     scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
     517                 : 
     518              77 :     return {};
     519                 : }
     520                 : 
     521                 : inline void
     522              11 : epoll_acceptor_service::post(epoll_op* op)
     523                 : {
     524              11 :     state_->sched_.post(op);
     525              11 : }
     526                 : 
     527                 : inline void
     528            4740 : epoll_acceptor_service::work_started() noexcept
     529                 : {
     530            4740 :     state_->sched_.work_started();
     531            4740 : }
     532                 : 
     533                 : inline void
     534               9 : epoll_acceptor_service::work_finished() noexcept
     535                 : {
     536               9 :     state_->sched_.work_finished();
     537               9 : }
     538                 : 
     539                 : inline epoll_socket_service*
     540            4733 : epoll_acceptor_service::socket_service() const noexcept
     541                 : {
     542            4733 :     auto* svc = ctx_.find_service<detail::socket_service>();
     543            4733 :     return svc ? dynamic_cast<epoll_socket_service*>(svc) : nullptr;
     544                 : }
     545                 : 
     546                 : } // namespace boost::corosio::detail
     547                 : 
     548                 : #endif // BOOST_COROSIO_HAS_EPOLL
     549                 : 
     550                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
        

Generated by: LCOV version 2.3