LCOV - code coverage report
Current view: top level - corosio/native/detail/select - select_socket_service.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 75.7 % 346 262 84
Test Date: 2026-02-27 19:39:18 Functions: 93.3 % 30 28 2

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_HAS_SELECT
      16                 : 
      17                 : #include <boost/corosio/detail/config.hpp>
      18                 : #include <boost/capy/ex/execution_context.hpp>
      19                 : #include <boost/corosio/detail/socket_service.hpp>
      20                 : 
      21                 : #include <boost/corosio/native/detail/select/select_socket.hpp>
      22                 : #include <boost/corosio/native/detail/select/select_scheduler.hpp>
      23                 : 
      24                 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
      25                 : #include <boost/corosio/detail/dispatch_coro.hpp>
      26                 : #include <boost/corosio/native/detail/make_err.hpp>
      27                 : 
      28                 : #include <boost/corosio/detail/except.hpp>
      29                 : 
      30                 : #include <boost/capy/buffers.hpp>
      31                 : 
      32                 : #include <errno.h>
      33                 : #include <fcntl.h>
      34                 : #include <netinet/in.h>
      35                 : #include <netinet/tcp.h>
      36                 : #include <sys/socket.h>
      37                 : #include <unistd.h>
      38                 : 
      39                 : #include <memory>
      40                 : #include <mutex>
      41                 : #include <unordered_map>
      42                 : 
      43                 : /*
      44                 :     select Socket Implementation
      45                 :     ============================
      46                 : 
      47                 :     This mirrors the epoll_sockets design for behavioral consistency.
      48                 :     Each I/O operation follows the same pattern:
      49                 :       1. Try the syscall immediately (non-blocking socket)
      50                 :       2. If it succeeds or fails with a real error, post to completion queue
      51                 :       3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
      52                 : 
      53                 :     Cancellation
      54                 :     ------------
      55                 :     See op.hpp for the completion/cancellation race handling via the
      56                 :     `registered` atomic. cancel() must complete pending operations (post
      57                 :     them with cancelled flag) so coroutines waiting on them can resume.
      58                 :     close_socket() calls cancel() first to ensure this.
      59                 : 
      60                 :     Impl Lifetime with shared_ptr
      61                 :     -----------------------------
      62                 :     Socket impls use enable_shared_from_this. The service owns impls via
      63                 :     shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
      64                 :     removal. When a user calls close(), we call cancel() which posts pending
      65                 :     ops to the scheduler.
      66                 : 
      67                 :     CRITICAL: The posted ops must keep the impl alive until they complete.
      68                 :     Otherwise the scheduler would process a freed op (use-after-free). The
      69                 :     cancel() method captures shared_from_this() into op.impl_ptr before
      70                 :     posting. When the op completes, impl_ptr is cleared, allowing the impl
      71                 :     to be destroyed if no other references exist.
      72                 : 
      73                 :     Service Ownership
      74                 :     -----------------
      75                 :     select_socket_service owns all socket impls. destroy() removes the
      76                 :     shared_ptr from the map, but the impl may survive if ops still hold
      77                 :     impl_ptr refs. shutdown() closes all sockets and clears the map; any
      78                 :     in-flight ops will complete and release their refs.
      79                 : */
      80                 : 
      81                 : namespace boost::corosio::detail {
      82                 : 
      83                 : /** State for select socket service. */
      84                 : class select_socket_state
      85                 : {
      86                 : public:
      87 HIT         168 :     explicit select_socket_state(select_scheduler& sched) noexcept
      88             168 :         : sched_(sched)
      89                 :     {
      90             168 :     }
      91                 : 
      92                 :     select_scheduler& sched_;
      93                 :     std::mutex mutex_;
      94                 :     intrusive_list<select_socket> socket_list_;
      95                 :     std::unordered_map<select_socket*, std::shared_ptr<select_socket>>
      96                 :         socket_ptrs_;
      97                 : };
      98                 : 
      99                 : /** select socket service implementation.
     100                 : 
     101                 :     Inherits from socket_service to enable runtime polymorphism.
     102                 :     Uses key_type = socket_service for service lookup.
     103                 : */
     104                 : class BOOST_COROSIO_DECL select_socket_service final : public socket_service
     105                 : {
     106                 : public:
     107                 :     explicit select_socket_service(capy::execution_context& ctx);
     108                 :     ~select_socket_service() override;
     109                 : 
     110                 :     select_socket_service(select_socket_service const&)            = delete;
     111                 :     select_socket_service& operator=(select_socket_service const&) = delete;
     112                 : 
     113                 :     void shutdown() override;
     114                 : 
     115                 :     io_object::implementation* construct() override;
     116                 :     void destroy(io_object::implementation*) override;
     117                 :     void close(io_object::handle&) override;
     118                 :     std::error_code open_socket(
     119                 :         tcp_socket::implementation& impl,
     120                 :         int family,
     121                 :         int type,
     122                 :         int protocol) override;
     123                 : 
     124           10939 :     select_scheduler& scheduler() const noexcept
     125                 :     {
     126           10939 :         return state_->sched_;
     127                 :     }
     128                 :     void post(select_op* op);
     129                 :     void work_started() noexcept;
     130                 :     void work_finished() noexcept;
     131                 : 
     132                 : private:
     133                 :     std::unique_ptr<select_socket_state> state_;
     134                 : };
     135                 : 
     136                 : // Backward compatibility alias
     137                 : using select_sockets = select_socket_service;
     138                 : 
     139                 : inline void
     140              98 : select_op::canceller::operator()() const noexcept
     141                 : {
     142              98 :     op->cancel();
     143              98 : }
     144                 : 
     145                 : inline void
     146 MIS           0 : select_connect_op::cancel() noexcept
     147                 : {
     148               0 :     if (socket_impl_)
     149               0 :         socket_impl_->cancel_single_op(*this);
     150                 :     else
     151               0 :         request_cancel();
     152               0 : }
     153                 : 
     154                 : inline void
     155 HIT          98 : select_read_op::cancel() noexcept
     156                 : {
     157              98 :     if (socket_impl_)
     158              98 :         socket_impl_->cancel_single_op(*this);
     159                 :     else
     160 MIS           0 :         request_cancel();
     161 HIT          98 : }
     162                 : 
     163                 : inline void
     164 MIS           0 : select_write_op::cancel() noexcept
     165                 : {
     166               0 :     if (socket_impl_)
     167               0 :         socket_impl_->cancel_single_op(*this);
     168                 :     else
     169               0 :         request_cancel();
     170               0 : }
     171                 : 
     172                 : inline void
     173 HIT        3497 : select_connect_op::operator()()
     174                 : {
     175            3497 :     stop_cb.reset();
     176                 : 
     177            3497 :     bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
     178                 : 
     179                 :     // Cache endpoints on successful connect
     180            3497 :     if (success && socket_impl_)
     181                 :     {
     182            3495 :         endpoint local_ep;
     183            3495 :         sockaddr_storage local_storage{};
     184            3495 :         socklen_t local_len = sizeof(local_storage);
     185            3495 :         if (::getsockname(
     186            3495 :                 fd, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
     187                 :             0)
     188            3495 :             local_ep = from_sockaddr(local_storage);
     189            3495 :         static_cast<select_socket*>(socket_impl_)
     190            3495 :             ->set_endpoints(local_ep, target_endpoint);
     191                 :     }
     192                 : 
     193            3497 :     if (ec_out)
     194                 :     {
     195            3497 :         if (cancelled.load(std::memory_order_acquire))
     196 MIS           0 :             *ec_out = capy::error::canceled;
     197 HIT        3497 :         else if (errn != 0)
     198               2 :             *ec_out = make_err(errn);
     199                 :         else
     200            3495 :             *ec_out = {};
     201                 :     }
     202                 : 
     203            3497 :     if (bytes_out)
     204 MIS           0 :         *bytes_out = bytes_transferred;
     205                 : 
     206                 :     // Move to stack before destroying the frame
     207 HIT        3497 :     capy::executor_ref saved_ex(ex);
     208            3497 :     std::coroutine_handle<> saved_h(h);
     209            3497 :     impl_ptr.reset();
     210            3497 :     dispatch_coro(saved_ex, saved_h).resume();
     211            3497 : }
     212                 : 
     213           10511 : inline select_socket::select_socket(select_socket_service& svc) noexcept
     214           10511 :     : svc_(svc)
     215                 : {
     216           10511 : }
     217                 : 
     218                 : inline std::coroutine_handle<>
     219            3497 : select_socket::connect(
     220                 :     std::coroutine_handle<> h,
     221                 :     capy::executor_ref ex,
     222                 :     endpoint ep,
     223                 :     std::stop_token token,
     224                 :     std::error_code* ec)
     225                 : {
     226            3497 :     auto& op = conn_;
     227            3497 :     op.reset();
     228            3497 :     op.h               = h;
     229            3497 :     op.ex              = ex;
     230            3497 :     op.ec_out          = ec;
     231            3497 :     op.fd              = fd_;
     232            3497 :     op.target_endpoint = ep; // Store target for endpoint caching
     233            3497 :     op.start(token, this);
     234                 : 
     235            3497 :     sockaddr_storage storage{};
     236                 :     socklen_t addrlen =
     237            3497 :         detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
     238            3497 :     int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
     239                 : 
     240            3497 :     if (result == 0)
     241                 :     {
     242                 :         // Sync success — cache endpoints immediately
     243 MIS           0 :         sockaddr_storage local_storage{};
     244               0 :         socklen_t local_len = sizeof(local_storage);
     245               0 :         if (::getsockname(
     246               0 :                 fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
     247                 :             0)
     248               0 :             local_endpoint_ = detail::from_sockaddr(local_storage);
     249               0 :         remote_endpoint_ = ep;
     250                 : 
     251               0 :         op.complete(0, 0);
     252               0 :         op.impl_ptr = shared_from_this();
     253               0 :         svc_.post(&op);
     254                 :         // completion is always posted to scheduler queue, never inline.
     255               0 :         return std::noop_coroutine();
     256                 :     }
     257                 : 
     258 HIT        3497 :     if (errno == EINPROGRESS)
     259                 :     {
     260            3497 :         svc_.work_started();
     261            3497 :         op.impl_ptr = shared_from_this();
     262                 : 
     263                 :         // Set registering BEFORE register_fd to close the race window where
     264                 :         // reactor sees an event before we set registered. The reactor treats
     265                 :         // registering the same as registered when claiming the op.
     266            3497 :         op.registered.store(
     267                 :             select_registration_state::registering, std::memory_order_release);
     268            3497 :         svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
     269                 : 
     270                 :         // Transition to registered. If this fails, reactor or cancel already
     271                 :         // claimed the op (state is now unregistered), so we're done. However,
     272                 :         // we must still deregister the fd because cancel's deregister_fd may
     273                 :         // have run before our register_fd, leaving the fd orphaned.
     274            3497 :         auto expected = select_registration_state::registering;
     275            3497 :         if (!op.registered.compare_exchange_strong(
     276                 :                 expected, select_registration_state::registered,
     277                 :                 std::memory_order_acq_rel))
     278                 :         {
     279 MIS           0 :             svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
     280                 :             // completion is always posted to scheduler queue, never inline.
     281               0 :             return std::noop_coroutine();
     282                 :         }
     283                 : 
     284                 :         // If cancelled was set before we registered, handle it now.
     285 HIT        3497 :         if (op.cancelled.load(std::memory_order_acquire))
     286                 :         {
     287 MIS           0 :             auto prev = op.registered.exchange(
     288                 :                 select_registration_state::unregistered,
     289                 :                 std::memory_order_acq_rel);
     290               0 :             if (prev != select_registration_state::unregistered)
     291                 :             {
     292               0 :                 svc_.scheduler().deregister_fd(
     293                 :                     fd_, select_scheduler::event_write);
     294               0 :                 op.impl_ptr = shared_from_this();
     295               0 :                 svc_.post(&op);
     296               0 :                 svc_.work_finished();
     297                 :             }
     298                 :         }
     299                 :         // completion is always posted to scheduler queue, never inline.
     300 HIT        3497 :         return std::noop_coroutine();
     301                 :     }
     302                 : 
     303 MIS           0 :     op.complete(errno, 0);
     304               0 :     op.impl_ptr = shared_from_this();
     305               0 :     svc_.post(&op);
     306                 :     // completion is always posted to scheduler queue, never inline.
     307               0 :     return std::noop_coroutine();
     308                 : }
     309                 : 
     310                 : inline std::coroutine_handle<>
     311 HIT       77685 : select_socket::read_some(
     312                 :     std::coroutine_handle<> h,
     313                 :     capy::executor_ref ex,
     314                 :     buffer_param param,
     315                 :     std::stop_token token,
     316                 :     std::error_code* ec,
     317                 :     std::size_t* bytes_out)
     318                 : {
     319           77685 :     auto& op = rd_;
     320           77685 :     op.reset();
     321           77685 :     op.h         = h;
     322           77685 :     op.ex        = ex;
     323           77685 :     op.ec_out    = ec;
     324           77685 :     op.bytes_out = bytes_out;
     325           77685 :     op.fd        = fd_;
     326           77685 :     op.start(token, this);
     327                 : 
     328           77685 :     capy::mutable_buffer bufs[select_read_op::max_buffers];
     329           77685 :     op.iovec_count =
     330           77685 :         static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
     331                 : 
     332           77685 :     if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
     333                 :     {
     334               1 :         op.empty_buffer_read = true;
     335               1 :         op.complete(0, 0);
     336               1 :         op.impl_ptr = shared_from_this();
     337               1 :         svc_.post(&op);
     338               1 :         return std::noop_coroutine();
     339                 :     }
     340                 : 
     341          155368 :     for (int i = 0; i < op.iovec_count; ++i)
     342                 :     {
     343           77684 :         op.iovecs[i].iov_base = bufs[i].data();
     344           77684 :         op.iovecs[i].iov_len  = bufs[i].size();
     345                 :     }
     346                 : 
     347           77684 :     ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
     348                 : 
     349           77684 :     if (n > 0)
     350                 :     {
     351           77401 :         op.complete(0, static_cast<std::size_t>(n));
     352           77401 :         op.impl_ptr = shared_from_this();
     353           77401 :         svc_.post(&op);
     354           77401 :         return std::noop_coroutine();
     355                 :     }
     356                 : 
     357             283 :     if (n == 0)
     358                 :     {
     359               5 :         op.complete(0, 0);
     360               5 :         op.impl_ptr = shared_from_this();
     361               5 :         svc_.post(&op);
     362               5 :         return std::noop_coroutine();
     363                 :     }
     364                 : 
     365             278 :     if (errno == EAGAIN || errno == EWOULDBLOCK)
     366                 :     {
     367             278 :         svc_.work_started();
     368             278 :         op.impl_ptr = shared_from_this();
     369                 : 
     370                 :         // Set registering BEFORE register_fd to close the race window where
     371                 :         // reactor sees an event before we set registered.
     372             278 :         op.registered.store(
     373                 :             select_registration_state::registering, std::memory_order_release);
     374             278 :         svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
     375                 : 
     376                 :         // Transition to registered. If this fails, reactor or cancel already
     377                 :         // claimed the op (state is now unregistered), so we're done. However,
     378                 :         // we must still deregister the fd because cancel's deregister_fd may
     379                 :         // have run before our register_fd, leaving the fd orphaned.
     380             278 :         auto expected = select_registration_state::registering;
     381             278 :         if (!op.registered.compare_exchange_strong(
     382                 :                 expected, select_registration_state::registered,
     383                 :                 std::memory_order_acq_rel))
     384                 :         {
     385 MIS           0 :             svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
     386               0 :             return std::noop_coroutine();
     387                 :         }
     388                 : 
     389                 :         // If cancelled was set before we registered, handle it now.
     390 HIT         278 :         if (op.cancelled.load(std::memory_order_acquire))
     391                 :         {
     392 MIS           0 :             auto prev = op.registered.exchange(
     393                 :                 select_registration_state::unregistered,
     394                 :                 std::memory_order_acq_rel);
     395               0 :             if (prev != select_registration_state::unregistered)
     396                 :             {
     397               0 :                 svc_.scheduler().deregister_fd(
     398                 :                     fd_, select_scheduler::event_read);
     399               0 :                 op.impl_ptr = shared_from_this();
     400               0 :                 svc_.post(&op);
     401               0 :                 svc_.work_finished();
     402                 :             }
     403                 :         }
     404 HIT         278 :         return std::noop_coroutine();
     405                 :     }
     406                 : 
     407 MIS           0 :     op.complete(errno, 0);
     408               0 :     op.impl_ptr = shared_from_this();
     409               0 :     svc_.post(&op);
     410               0 :     return std::noop_coroutine();
     411                 : }
     412                 : 
     413                 : inline std::coroutine_handle<>
     414 HIT       77524 : select_socket::write_some(
     415                 :     std::coroutine_handle<> h,
     416                 :     capy::executor_ref ex,
     417                 :     buffer_param param,
     418                 :     std::stop_token token,
     419                 :     std::error_code* ec,
     420                 :     std::size_t* bytes_out)
     421                 : {
     422           77524 :     auto& op = wr_;
     423           77524 :     op.reset();
     424           77524 :     op.h         = h;
     425           77524 :     op.ex        = ex;
     426           77524 :     op.ec_out    = ec;
     427           77524 :     op.bytes_out = bytes_out;
     428           77524 :     op.fd        = fd_;
     429           77524 :     op.start(token, this);
     430                 : 
     431           77524 :     capy::mutable_buffer bufs[select_write_op::max_buffers];
     432           77524 :     op.iovec_count =
     433           77524 :         static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
     434                 : 
     435           77524 :     if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
     436                 :     {
     437               1 :         op.complete(0, 0);
     438               1 :         op.impl_ptr = shared_from_this();
     439               1 :         svc_.post(&op);
     440               1 :         return std::noop_coroutine();
     441                 :     }
     442                 : 
     443          155046 :     for (int i = 0; i < op.iovec_count; ++i)
     444                 :     {
     445           77523 :         op.iovecs[i].iov_base = bufs[i].data();
     446           77523 :         op.iovecs[i].iov_len  = bufs[i].size();
     447                 :     }
     448                 : 
     449           77523 :     msghdr msg{};
     450           77523 :     msg.msg_iov    = op.iovecs;
     451           77523 :     msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
     452                 : 
     453           77523 :     ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
     454                 : 
     455           77523 :     if (n > 0)
     456                 :     {
     457           77522 :         op.complete(0, static_cast<std::size_t>(n));
     458           77522 :         op.impl_ptr = shared_from_this();
     459           77522 :         svc_.post(&op);
     460           77522 :         return std::noop_coroutine();
     461                 :     }
     462                 : 
     463               1 :     if (errno == EAGAIN || errno == EWOULDBLOCK)
     464                 :     {
     465 MIS           0 :         svc_.work_started();
     466               0 :         op.impl_ptr = shared_from_this();
     467                 : 
     468                 :         // Set registering BEFORE register_fd to close the race window where
     469                 :         // reactor sees an event before we set registered.
     470               0 :         op.registered.store(
     471                 :             select_registration_state::registering, std::memory_order_release);
     472               0 :         svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
     473                 : 
     474                 :         // Transition to registered. If this fails, reactor or cancel already
     475                 :         // claimed the op (state is now unregistered), so we're done. However,
     476                 :         // we must still deregister the fd because cancel's deregister_fd may
     477                 :         // have run before our register_fd, leaving the fd orphaned.
     478               0 :         auto expected = select_registration_state::registering;
     479               0 :         if (!op.registered.compare_exchange_strong(
     480                 :                 expected, select_registration_state::registered,
     481                 :                 std::memory_order_acq_rel))
     482                 :         {
     483               0 :             svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
     484               0 :             return std::noop_coroutine();
     485                 :         }
     486                 : 
     487                 :         // If cancelled was set before we registered, handle it now.
     488               0 :         if (op.cancelled.load(std::memory_order_acquire))
     489                 :         {
     490               0 :             auto prev = op.registered.exchange(
     491                 :                 select_registration_state::unregistered,
     492                 :                 std::memory_order_acq_rel);
     493               0 :             if (prev != select_registration_state::unregistered)
     494                 :             {
     495               0 :                 svc_.scheduler().deregister_fd(
     496                 :                     fd_, select_scheduler::event_write);
     497               0 :                 op.impl_ptr = shared_from_this();
     498               0 :                 svc_.post(&op);
     499               0 :                 svc_.work_finished();
     500                 :             }
     501                 :         }
     502               0 :         return std::noop_coroutine();
     503                 :     }
     504                 : 
     505 HIT           1 :     op.complete(errno ? errno : EIO, 0);
     506               1 :     op.impl_ptr = shared_from_this();
     507               1 :     svc_.post(&op);
     508               1 :     return std::noop_coroutine();
     509                 : }
     510                 : 
     511                 : inline std::error_code
     512               3 : select_socket::shutdown(tcp_socket::shutdown_type what) noexcept
     513                 : {
     514                 :     int how;
     515               3 :     switch (what)
     516                 :     {
     517               1 :     case tcp_socket::shutdown_receive:
     518               1 :         how = SHUT_RD;
     519               1 :         break;
     520               1 :     case tcp_socket::shutdown_send:
     521               1 :         how = SHUT_WR;
     522               1 :         break;
     523               1 :     case tcp_socket::shutdown_both:
     524               1 :         how = SHUT_RDWR;
     525               1 :         break;
     526 MIS           0 :     default:
     527               0 :         return make_err(EINVAL);
     528                 :     }
     529 HIT           3 :     if (::shutdown(fd_, how) != 0)
     530 MIS           0 :         return make_err(errno);
     531 HIT           3 :     return {};
     532                 : }
     533                 : 
     534                 : inline std::error_code
     535              28 : select_socket::set_option(
     536                 :     int level, int optname, void const* data, std::size_t size) noexcept
     537                 : {
     538              28 :     if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) !=
     539                 :         0)
     540 MIS           0 :         return make_err(errno);
     541 HIT          28 :     return {};
     542                 : }
     543                 : 
     544                 : inline std::error_code
     545              31 : select_socket::get_option(
     546                 :     int level, int optname, void* data, std::size_t* size) const noexcept
     547                 : {
     548              31 :     socklen_t len = static_cast<socklen_t>(*size);
     549              31 :     if (::getsockopt(fd_, level, optname, data, &len) != 0)
     550 MIS           0 :         return make_err(errno);
     551 HIT          31 :     *size = static_cast<std::size_t>(len);
     552              31 :     return {};
     553                 : }
     554                 : 
     555                 : inline void
     556             173 : select_socket::cancel() noexcept
     557                 : {
     558             173 :     auto self = weak_from_this().lock();
     559             173 :     if (!self)
     560 MIS           0 :         return;
     561                 : 
     562 HIT         519 :     auto cancel_op = [this, &self](select_op& op, int events) {
     563             519 :         auto prev = op.registered.exchange(
     564                 :             select_registration_state::unregistered, std::memory_order_acq_rel);
     565             519 :         op.request_cancel();
     566             519 :         if (prev != select_registration_state::unregistered)
     567                 :         {
     568              90 :             svc_.scheduler().deregister_fd(fd_, events);
     569              90 :             op.impl_ptr = self;
     570              90 :             svc_.post(&op);
     571              90 :             svc_.work_finished();
     572                 :         }
     573             692 :     };
     574                 : 
     575             173 :     cancel_op(conn_, select_scheduler::event_write);
     576             173 :     cancel_op(rd_, select_scheduler::event_read);
     577             173 :     cancel_op(wr_, select_scheduler::event_write);
     578             173 : }
     579                 : 
     580                 : inline void
     581              98 : select_socket::cancel_single_op(select_op& op) noexcept
     582                 : {
     583              98 :     auto self = weak_from_this().lock();
     584              98 :     if (!self)
     585 MIS           0 :         return;
     586                 : 
     587                 :     // Called from stop_token callback to cancel a specific pending operation.
     588 HIT          98 :     auto prev = op.registered.exchange(
     589                 :         select_registration_state::unregistered, std::memory_order_acq_rel);
     590              98 :     op.request_cancel();
     591                 : 
     592              98 :     if (prev != select_registration_state::unregistered)
     593                 :     {
     594                 :         // Determine which event type to deregister
     595              66 :         int events = 0;
     596              66 :         if (&op == &conn_ || &op == &wr_)
     597 MIS           0 :             events = select_scheduler::event_write;
     598 HIT          66 :         else if (&op == &rd_)
     599              66 :             events = select_scheduler::event_read;
     600                 : 
     601              66 :         svc_.scheduler().deregister_fd(fd_, events);
     602                 : 
     603              66 :         op.impl_ptr = self;
     604              66 :         svc_.post(&op);
     605              66 :         svc_.work_finished();
     606                 :     }
     607              98 : }
     608                 : 
     609                 : inline void
     610           31541 : select_socket::close_socket() noexcept
     611                 : {
     612           31541 :     auto self = weak_from_this().lock();
     613           31541 :     if (self)
     614                 :     {
     615           94623 :         auto cancel_op = [this, &self](select_op& op, int events) {
     616           94623 :             auto prev = op.registered.exchange(
     617                 :                 select_registration_state::unregistered,
     618                 :                 std::memory_order_acq_rel);
     619           94623 :             op.request_cancel();
     620           94623 :             if (prev != select_registration_state::unregistered)
     621                 :             {
     622               1 :                 svc_.scheduler().deregister_fd(fd_, events);
     623               1 :                 op.impl_ptr = self;
     624               1 :                 svc_.post(&op);
     625               1 :                 svc_.work_finished();
     626                 :             }
     627          126164 :         };
     628                 : 
     629           31541 :         cancel_op(conn_, select_scheduler::event_write);
     630           31541 :         cancel_op(rd_, select_scheduler::event_read);
     631           31541 :         cancel_op(wr_, select_scheduler::event_write);
     632                 :     }
     633                 : 
     634           31541 :     if (fd_ >= 0)
     635                 :     {
     636            7007 :         svc_.scheduler().deregister_fd(
     637                 :             fd_, select_scheduler::event_read | select_scheduler::event_write);
     638            7007 :         ::close(fd_);
     639            7007 :         fd_ = -1;
     640                 :     }
     641                 : 
     642           31541 :     local_endpoint_  = endpoint{};
     643           31541 :     remote_endpoint_ = endpoint{};
     644           31541 : }
     645                 : 
     646             168 : inline select_socket_service::select_socket_service(
     647             168 :     capy::execution_context& ctx)
     648             168 :     : state_(
     649                 :           std::make_unique<select_socket_state>(
     650             168 :               ctx.use_service<select_scheduler>()))
     651                 : {
     652             168 : }
     653                 : 
     654             336 : inline select_socket_service::~select_socket_service() {}
     655                 : 
     656                 : inline void
     657             168 : select_socket_service::shutdown()
     658                 : {
     659             168 :     std::lock_guard lock(state_->mutex_);
     660                 : 
     661             168 :     while (auto* impl = state_->socket_list_.pop_front())
     662 MIS           0 :         impl->close_socket();
     663                 : 
     664                 :     // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
     665                 :     // drains completed_ops_, calling destroy() on each queued op. Letting
     666                 :     // ~state_ release the ptrs (during service destruction, after scheduler
     667                 :     // shutdown) keeps every impl alive until all ops have been drained.
     668 HIT         168 : }
     669                 : 
     670                 : inline io_object::implementation*
     671           10511 : select_socket_service::construct()
     672                 : {
     673           10511 :     auto impl = std::make_shared<select_socket>(*this);
     674           10511 :     auto* raw = impl.get();
     675                 : 
     676                 :     {
     677           10511 :         std::lock_guard lock(state_->mutex_);
     678           10511 :         state_->socket_list_.push_back(raw);
     679           10511 :         state_->socket_ptrs_.emplace(raw, std::move(impl));
     680           10511 :     }
     681                 : 
     682           10511 :     return raw;
     683           10511 : }
     684                 : 
     685                 : inline void
     686           10511 : select_socket_service::destroy(io_object::implementation* impl)
     687                 : {
     688           10511 :     auto* select_impl = static_cast<select_socket*>(impl);
     689           10511 :     select_impl->close_socket();
     690           10511 :     std::lock_guard lock(state_->mutex_);
     691           10511 :     state_->socket_list_.remove(select_impl);
     692           10511 :     state_->socket_ptrs_.erase(select_impl);
     693           10511 : }
     694                 : 
     695                 : inline std::error_code
     696            3512 : select_socket_service::open_socket(
     697                 :     tcp_socket::implementation& impl, int family, int type, int protocol)
     698                 : {
     699            3512 :     auto* select_impl = static_cast<select_socket*>(&impl);
     700            3512 :     select_impl->close_socket();
     701                 : 
     702            3512 :     int fd = ::socket(family, type, protocol);
     703            3512 :     if (fd < 0)
     704 MIS           0 :         return make_err(errno);
     705                 : 
     706 HIT        3512 :     if (family == AF_INET6)
     707                 :     {
     708               5 :         int one = 1;
     709               5 :         ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
     710                 :     }
     711                 : 
     712                 :     // Set non-blocking and close-on-exec
     713            3512 :     int flags = ::fcntl(fd, F_GETFL, 0);
     714            3512 :     if (flags == -1)
     715                 :     {
     716 MIS           0 :         int errn = errno;
     717               0 :         ::close(fd);
     718               0 :         return make_err(errn);
     719                 :     }
     720 HIT        3512 :     if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
     721                 :     {
     722 MIS           0 :         int errn = errno;
     723               0 :         ::close(fd);
     724               0 :         return make_err(errn);
     725                 :     }
     726 HIT        3512 :     if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
     727                 :     {
     728 MIS           0 :         int errn = errno;
     729               0 :         ::close(fd);
     730               0 :         return make_err(errn);
     731                 :     }
     732                 : 
     733                 :     // Check fd is within select() limits
     734 HIT        3512 :     if (fd >= FD_SETSIZE)
     735                 :     {
     736 MIS           0 :         ::close(fd);
     737               0 :         return make_err(EMFILE); // Too many open files
     738                 :     }
     739                 : 
     740 HIT        3512 :     select_impl->fd_ = fd;
     741            3512 :     return {};
     742                 : }
     743                 : 
     744                 : inline void
     745           17518 : select_socket_service::close(io_object::handle& h)
     746                 : {
     747           17518 :     static_cast<select_socket*>(h.get())->close_socket();
     748           17518 : }
     749                 : 
     750                 : inline void
     751          155088 : select_socket_service::post(select_op* op)
     752                 : {
     753          155088 :     state_->sched_.post(op);
     754          155088 : }
     755                 : 
     756                 : inline void
     757            3775 : select_socket_service::work_started() noexcept
     758                 : {
     759            3775 :     state_->sched_.work_started();
     760            3775 : }
     761                 : 
     762                 : inline void
     763             157 : select_socket_service::work_finished() noexcept
     764                 : {
     765             157 :     state_->sched_.work_finished();
     766             157 : }
     767                 : 
     768                 : } // namespace boost::corosio::detail
     769                 : 
     770                 : #endif // BOOST_COROSIO_HAS_SELECT
     771                 : 
     772                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
        

Generated by: LCOV version 2.3