LCOV - code coverage report
Current view: top level - corosio/native/detail/epoll - epoll_op.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 83.2 % 101 84 17
Test Date: 2026-02-27 19:39:18 Functions: 85.0 % 20 17 3

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_HAS_EPOLL
      16                 : 
      17                 : #include <boost/corosio/detail/config.hpp>
      18                 : #include <boost/corosio/io/io_object.hpp>
      19                 : #include <boost/corosio/endpoint.hpp>
      20                 : #include <boost/capy/ex/executor_ref.hpp>
      21                 : #include <coroutine>
      22                 : #include <boost/capy/error.hpp>
      23                 : #include <system_error>
      24                 : 
      25                 : #include <boost/corosio/native/detail/make_err.hpp>
      26                 : #include <boost/corosio/detail/dispatch_coro.hpp>
      27                 : #include <boost/corosio/detail/scheduler_op.hpp>
      28                 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
      29                 : 
      30                 : #include <unistd.h>
      31                 : #include <errno.h>
      32                 : 
      33                 : #include <atomic>
      34                 : #include <cstddef>
      35                 : #include <memory>
      36                 : #include <mutex>
      37                 : #include <optional>
      38                 : #include <stop_token>
      39                 : 
      40                 : #include <netinet/in.h>
      41                 : #include <sys/socket.h>
      42                 : #include <sys/uio.h>
      43                 : 
      44                 : /*
      45                 :     epoll Operation State
      46                 :     =====================
      47                 : 
      48                 :     Each async I/O operation has a corresponding epoll_op-derived struct that
      49                 :     holds the operation's state while it's in flight. The socket impl owns
      50                 :     fixed slots for each operation type (conn_, rd_, wr_), so only one
      51                 :     operation of each type can be pending per socket at a time.
      52                 : 
      53                 :     Persistent Registration
      54                 :     -----------------------
      55                 :     File descriptors are registered with epoll once (via descriptor_state) and
      56                 :     stay registered until closed. The descriptor_state tracks which operations
      57                 :     are pending (read_op, write_op, connect_op). When an event arrives, the
      58                 :     reactor dispatches to the appropriate pending operation.
      59                 : 
      60                 :     Impl Lifetime Management
      61                 :     ------------------------
      62                 :     When cancel() posts an op to the scheduler's ready queue, the socket impl
      63                 :     might be destroyed before the scheduler processes the op. The `impl_ptr`
      64                 :     member holds a shared_ptr to the impl, keeping it alive until the op
      65                 :     completes. This is set by cancel() and cleared in operator() after the
      66                 :     coroutine is resumed.
      67                 : 
      68                 :     EOF Detection
      69                 :     -------------
      70                 :     For reads, 0 bytes with no error means EOF. But an empty user buffer also
      71                 :     returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
      72                 : 
      73                 :     SIGPIPE Prevention
      74                 :     ------------------
      75                 :     Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
      76                 :     SIGPIPE when the peer has closed.
      77                 : */
      78                 : 
      79                 : namespace boost::corosio::detail {
      80                 : 
      81                 : // Forward declarations
      82                 : class epoll_socket;
      83                 : class epoll_acceptor;
      84                 : struct epoll_op;
      85                 : 
      86                 : // Forward declaration
      87                 : class epoll_scheduler;
      88                 : 
      89                 : /** Per-descriptor state for persistent epoll registration.
      90                 : 
      91                 :     Tracks pending operations for a file descriptor. The fd is registered
      92                 :     once with epoll and stays registered until closed.
      93                 : 
      94                 :     This struct extends scheduler_op to support deferred I/O processing.
      95                 :     When epoll events arrive, the reactor sets ready_events and queues
      96                 :     this descriptor for processing. When popped from the scheduler queue,
      97                 :     operator() performs the actual I/O and queues completion handlers.
      98                 : 
      99                 :     @par Deferred I/O Model
     100                 :     The reactor no longer performs I/O directly. Instead:
     101                 :     1. Reactor sets ready_events and queues descriptor_state
     102                 :     2. Scheduler pops descriptor_state and calls operator()
     103                 :     3. operator() performs I/O under mutex and queues completions
     104                 : 
     105                 :     This eliminates per-descriptor mutex locking from the reactor hot path.
     106                 : 
     107                 :     @par Thread Safety
     108                 :     The mutex protects operation pointers and ready flags during I/O.
     109                 :     ready_events_ and is_enqueued_ are atomic for lock-free reactor access.
     110                 : */
     111                 : struct descriptor_state final : scheduler_op
     112                 : {
     113                 :     std::mutex mutex;
     114                 : 
     115                 :     // Protected by mutex
     116                 :     epoll_op* read_op    = nullptr;
     117                 :     epoll_op* write_op   = nullptr;
     118                 :     epoll_op* connect_op = nullptr;
     119                 : 
     120                 :     // Caches edge events that arrived before an op was registered
     121                 :     bool read_ready  = false;
     122                 :     bool write_ready = false;
     123                 : 
     124                 :     // Deferred cancellation: set by cancel() when the target op is not
     125                 :     // parked (e.g. completing inline via speculative I/O). Checked when
     126                 :     // the next op parks; if set, the op is immediately self-cancelled.
     127                 :     // This matches IOCP semantics where CancelIoEx always succeeds.
     128                 :     bool read_cancel_pending    = false;
     129                 :     bool write_cancel_pending   = false;
     130                 :     bool connect_cancel_pending = false;
     131                 : 
     132                 :     // Set during registration only (no mutex needed)
     133                 :     std::uint32_t registered_events = 0;
     134                 :     int fd                          = -1;
     135                 : 
     136                 :     // For deferred I/O - set by reactor, read by scheduler
     137                 :     std::atomic<std::uint32_t> ready_events_{0};
     138                 :     std::atomic<bool> is_enqueued_{false};
     139                 :     epoll_scheduler const* scheduler_ = nullptr;
     140                 : 
     141                 :     // Prevents impl destruction while this descriptor_state is queued.
     142                 :     // Set by close_socket() when is_enqueued_ is true, cleared by operator().
     143                 :     std::shared_ptr<void> impl_ref_;
     144                 : 
     145                 :     /// Add ready events atomically.
     146 HIT       45424 :     void add_ready_events(std::uint32_t ev) noexcept
     147                 :     {
     148           45424 :         ready_events_.fetch_or(ev, std::memory_order_relaxed);
     149           45424 :     }
     150                 : 
     151                 :     /// Perform deferred I/O and queue completions.
     152                 :     void operator()() override;
     153                 : 
     154                 :     /// Destroy without invoking.
     155                 :     /// Called during scheduler::shutdown() drain. Clear impl_ref_ to break
     156                 :     /// the self-referential cycle set by close_socket().
     157              30 :     void destroy() override
     158                 :     {
     159              30 :         impl_ref_.reset();
     160              30 :     }
     161                 : };
     162                 : 
     163                 : struct epoll_op : scheduler_op
     164                 : {
     165                 :     struct canceller
     166                 :     {
     167                 :         epoll_op* op;
     168                 :         void operator()() const noexcept;
     169                 :     };
     170                 : 
     171                 :     std::coroutine_handle<> h;
     172                 :     capy::executor_ref ex;
     173                 :     std::error_code* ec_out = nullptr;
     174                 :     std::size_t* bytes_out  = nullptr;
     175                 : 
     176                 :     int fd                        = -1;
     177                 :     int errn                      = 0;
     178                 :     std::size_t bytes_transferred = 0;
     179                 : 
     180                 :     std::atomic<bool> cancelled{false};
     181                 :     std::optional<std::stop_callback<canceller>> stop_cb;
     182                 : 
     183                 :     // Prevents use-after-free when socket is closed with pending ops.
     184                 :     // See "Impl Lifetime Management" in file header.
     185                 :     std::shared_ptr<void> impl_ptr;
     186                 : 
     187                 :     // For stop_token cancellation - pointer to owning socket/acceptor impl.
     188                 :     // When stop is requested, we call back to the impl to perform actual I/O cancellation.
     189                 :     epoll_socket* socket_impl_     = nullptr;
     190                 :     epoll_acceptor* acceptor_impl_ = nullptr;
     191                 : 
     192           42871 :     epoll_op() = default;
     193                 : 
     194          261246 :     void reset() noexcept
     195                 :     {
     196          261246 :         fd                = -1;
     197          261246 :         errn              = 0;
     198          261246 :         bytes_transferred = 0;
     199          261246 :         cancelled.store(false, std::memory_order_relaxed);
     200          261246 :         impl_ptr.reset();
     201          261246 :         socket_impl_   = nullptr;
     202          261246 :         acceptor_impl_ = nullptr;
     203          261246 :     }
     204                 : 
     205                 :     // Defined in sockets.cpp where epoll_socket is complete
     206                 :     void operator()() override;
     207                 : 
     208           25126 :     virtual bool is_read_operation() const noexcept
     209                 :     {
     210           25126 :         return false;
     211                 :     }
     212                 :     virtual void cancel() noexcept = 0;
     213                 : 
     214 MIS           0 :     void destroy() override
     215                 :     {
     216               0 :         stop_cb.reset();
     217               0 :         impl_ptr.reset();
     218               0 :     }
     219                 : 
     220 HIT      129276 :     void request_cancel() noexcept
     221                 :     {
     222          129276 :         cancelled.store(true, std::memory_order_release);
     223          129276 :     }
     224                 : 
     225           55178 :     void start(std::stop_token const& token, epoll_socket* impl)
     226                 :     {
     227           55178 :         cancelled.store(false, std::memory_order_release);
     228           55178 :         stop_cb.reset();
     229           55178 :         socket_impl_   = impl;
     230           55178 :         acceptor_impl_ = nullptr;
     231                 : 
     232           55178 :         if (token.stop_possible())
     233              99 :             stop_cb.emplace(token, canceller{this});
     234           55178 :     }
     235                 : 
     236            4742 :     void start(std::stop_token const& token, epoll_acceptor* impl)
     237                 :     {
     238            4742 :         cancelled.store(false, std::memory_order_release);
     239            4742 :         stop_cb.reset();
     240            4742 :         socket_impl_   = nullptr;
     241            4742 :         acceptor_impl_ = impl;
     242                 : 
     243            4742 :         if (token.stop_possible())
     244               9 :             stop_cb.emplace(token, canceller{this});
     245            4742 :     }
     246                 : 
     247           59856 :     void complete(int err, std::size_t bytes) noexcept
     248                 :     {
     249           59856 :         errn              = err;
     250           59856 :         bytes_transferred = bytes;
     251           59856 :     }
     252                 : 
     253 MIS           0 :     virtual void perform_io() noexcept {}
     254                 : };
     255                 : 
     256                 : struct epoll_connect_op final : epoll_op
     257                 : {
     258                 :     endpoint target_endpoint;
     259                 : 
     260 HIT        4736 :     void reset() noexcept
     261                 :     {
     262            4736 :         epoll_op::reset();
     263            4736 :         target_endpoint = endpoint{};
     264            4736 :     }
     265                 : 
     266            4734 :     void perform_io() noexcept override
     267                 :     {
     268                 :         // connect() completion status is retrieved via SO_ERROR, not return value
     269            4734 :         int err       = 0;
     270            4734 :         socklen_t len = sizeof(err);
     271            4734 :         if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
     272 MIS           0 :             err = errno;
     273 HIT        4734 :         complete(err, 0);
     274            4734 :     }
     275                 : 
     276                 :     // Defined in sockets.cpp where epoll_socket is complete
     277                 :     void operator()() override;
     278                 :     void cancel() noexcept override;
     279                 : };
     280                 : 
     281                 : struct epoll_read_op final : epoll_op
     282                 : {
     283                 :     static constexpr std::size_t max_buffers = 16;
     284                 :     iovec iovecs[max_buffers];
     285                 :     int iovec_count        = 0;
     286                 :     bool empty_buffer_read = false;
     287                 : 
     288           25111 :     bool is_read_operation() const noexcept override
     289                 :     {
     290           25111 :         return !empty_buffer_read;
     291                 :     }
     292                 : 
     293          125984 :     void reset() noexcept
     294                 :     {
     295          125984 :         epoll_op::reset();
     296          125984 :         iovec_count       = 0;
     297          125984 :         empty_buffer_read = false;
     298          125984 :     }
     299                 : 
     300             146 :     void perform_io() noexcept override
     301                 :     {
     302                 :         ssize_t n;
     303                 :         do
     304                 :         {
     305             146 :             n = ::readv(fd, iovecs, iovec_count);
     306                 :         }
     307             146 :         while (n < 0 && errno == EINTR);
     308                 : 
     309             146 :         if (n >= 0)
     310               4 :             complete(0, static_cast<std::size_t>(n));
     311                 :         else
     312             142 :             complete(errno, 0);
     313             146 :     }
     314                 : 
     315                 :     void cancel() noexcept override;
     316                 : };
     317                 : 
     318                 : struct epoll_write_op final : epoll_op
     319                 : {
     320                 :     static constexpr std::size_t max_buffers = 16;
     321                 :     iovec iovecs[max_buffers];
     322                 :     int iovec_count = 0;
     323                 : 
     324          125784 :     void reset() noexcept
     325                 :     {
     326          125784 :         epoll_op::reset();
     327          125784 :         iovec_count = 0;
     328          125784 :     }
     329                 : 
     330 MIS           0 :     void perform_io() noexcept override
     331                 :     {
     332               0 :         msghdr msg{};
     333               0 :         msg.msg_iov    = iovecs;
     334               0 :         msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
     335                 : 
     336                 :         ssize_t n;
     337                 :         do
     338                 :         {
     339               0 :             n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
     340                 :         }
     341               0 :         while (n < 0 && errno == EINTR);
     342                 : 
     343               0 :         if (n >= 0)
     344               0 :             complete(0, static_cast<std::size_t>(n));
     345                 :         else
     346               0 :             complete(errno, 0);
     347               0 :     }
     348                 : 
     349                 :     void cancel() noexcept override;
     350                 : };
     351                 : 
     352                 : struct epoll_accept_op final : epoll_op
     353                 : {
     354                 :     int accepted_fd                      = -1;
     355                 :     io_object::implementation** impl_out = nullptr;
     356                 :     sockaddr_storage peer_storage{};
     357                 : 
     358 HIT        4742 :     void reset() noexcept
     359                 :     {
     360            4742 :         epoll_op::reset();
     361            4742 :         accepted_fd  = -1;
     362            4742 :         impl_out     = nullptr;
     363            4742 :         peer_storage = {};
     364            4742 :     }
     365                 : 
     366            4731 :     void perform_io() noexcept override
     367                 :     {
     368            4731 :         socklen_t addrlen = sizeof(peer_storage);
     369                 :         int new_fd;
     370                 :         do
     371                 :         {
     372            9462 :             new_fd = ::accept4(
     373            4731 :                 fd, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen,
     374                 :                 SOCK_NONBLOCK | SOCK_CLOEXEC);
     375                 :         }
     376            4731 :         while (new_fd < 0 && errno == EINTR);
     377                 : 
     378            4731 :         if (new_fd >= 0)
     379                 :         {
     380            4731 :             accepted_fd = new_fd;
     381            4731 :             complete(0, 0);
     382                 :         }
     383                 :         else
     384                 :         {
     385 MIS           0 :             complete(errno, 0);
     386                 :         }
     387 HIT        4731 :     }
     388                 : 
     389                 :     // Defined in acceptors.cpp where epoll_acceptor is complete
     390                 :     void operator()() override;
     391                 :     void cancel() noexcept override;
     392                 : };
     393                 : 
     394                 : } // namespace boost::corosio::detail
     395                 : 
     396                 : #endif // BOOST_COROSIO_HAS_EPOLL
     397                 : 
     398                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
        

Generated by: LCOV version 2.3