1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_SELECT
15  
#if BOOST_COROSIO_HAS_SELECT
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  
#include <boost/corosio/detail/socket_service.hpp>
19  
#include <boost/corosio/detail/socket_service.hpp>
20  

20  

21  
#include <boost/corosio/native/detail/select/select_socket.hpp>
21  
#include <boost/corosio/native/detail/select/select_socket.hpp>
22  
#include <boost/corosio/native/detail/select/select_scheduler.hpp>
22  
#include <boost/corosio/native/detail/select/select_scheduler.hpp>
23  

23  

24 -
#include <boost/corosio/detail/endpoint_convert.hpp>
24 +
#include <boost/corosio/native/detail/endpoint_convert.hpp>
25  
#include <boost/corosio/detail/dispatch_coro.hpp>
25  
#include <boost/corosio/detail/dispatch_coro.hpp>
26 -
#include <boost/corosio/detail/make_err.hpp>
26 +
#include <boost/corosio/native/detail/make_err.hpp>
27  

27  

28  
#include <boost/corosio/detail/except.hpp>
28  
#include <boost/corosio/detail/except.hpp>
29  

29  

30  
#include <boost/capy/buffers.hpp>
30  
#include <boost/capy/buffers.hpp>
31  

31  

32  
#include <errno.h>
32  
#include <errno.h>
33  
#include <fcntl.h>
33  
#include <fcntl.h>
34  
#include <netinet/in.h>
34  
#include <netinet/in.h>
35  
#include <netinet/tcp.h>
35  
#include <netinet/tcp.h>
36  
#include <sys/socket.h>
36  
#include <sys/socket.h>
37  
#include <unistd.h>
37  
#include <unistd.h>
38  

38  

39  
#include <memory>
39  
#include <memory>
40  
#include <mutex>
40  
#include <mutex>
41  
#include <unordered_map>
41  
#include <unordered_map>
42  

42  

43  
/*
43  
/*
44  
    select Socket Implementation
44  
    select Socket Implementation
45  
    ============================
45  
    ============================
46  

46  

47  
    This mirrors the epoll_sockets design for behavioral consistency.
47  
    This mirrors the epoll_sockets design for behavioral consistency.
48  
    Each I/O operation follows the same pattern:
48  
    Each I/O operation follows the same pattern:
49  
      1. Try the syscall immediately (non-blocking socket)
49  
      1. Try the syscall immediately (non-blocking socket)
50  
      2. If it succeeds or fails with a real error, post to completion queue
50  
      2. If it succeeds or fails with a real error, post to completion queue
51  
      3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
51  
      3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
52  

52  

53  
    Cancellation
53  
    Cancellation
54  
    ------------
54  
    ------------
55  
    See op.hpp for the completion/cancellation race handling via the
55  
    See op.hpp for the completion/cancellation race handling via the
56  
    `registered` atomic. cancel() must complete pending operations (post
56  
    `registered` atomic. cancel() must complete pending operations (post
57  
    them with cancelled flag) so coroutines waiting on them can resume.
57  
    them with cancelled flag) so coroutines waiting on them can resume.
58  
    close_socket() calls cancel() first to ensure this.
58  
    close_socket() calls cancel() first to ensure this.
59  

59  

60  
    Impl Lifetime with shared_ptr
60  
    Impl Lifetime with shared_ptr
61  
    -----------------------------
61  
    -----------------------------
62  
    Socket impls use enable_shared_from_this. The service owns impls via
62  
    Socket impls use enable_shared_from_this. The service owns impls via
63  
    shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
63  
    shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
64  
    removal. When a user calls close(), we call cancel() which posts pending
64  
    removal. When a user calls close(), we call cancel() which posts pending
65  
    ops to the scheduler.
65  
    ops to the scheduler.
66  

66  

67  
    CRITICAL: The posted ops must keep the impl alive until they complete.
67  
    CRITICAL: The posted ops must keep the impl alive until they complete.
68  
    Otherwise the scheduler would process a freed op (use-after-free). The
68  
    Otherwise the scheduler would process a freed op (use-after-free). The
69  
    cancel() method captures shared_from_this() into op.impl_ptr before
69  
    cancel() method captures shared_from_this() into op.impl_ptr before
70  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
70  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
71  
    to be destroyed if no other references exist.
71  
    to be destroyed if no other references exist.
72  

72  

73  
    Service Ownership
73  
    Service Ownership
74  
    -----------------
74  
    -----------------
75  
    select_socket_service owns all socket impls. destroy() removes the
75  
    select_socket_service owns all socket impls. destroy() removes the
76  
    shared_ptr from the map, but the impl may survive if ops still hold
76  
    shared_ptr from the map, but the impl may survive if ops still hold
77  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
77  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
78  
    in-flight ops will complete and release their refs.
78  
    in-flight ops will complete and release their refs.
79  
*/
79  
*/
80  

80  

81  
namespace boost::corosio::detail {
81  
namespace boost::corosio::detail {
82  

82  

83  
/** State for select socket service. */
83  
/** State for select socket service. */
84  
class select_socket_state
84  
class select_socket_state
85  
{
85  
{
86  
public:
86  
public:
87  
    explicit select_socket_state(select_scheduler& sched) noexcept
87  
    explicit select_socket_state(select_scheduler& sched) noexcept
88  
        : sched_(sched)
88  
        : sched_(sched)
89  
    {
89  
    {
90  
    }
90  
    }
91  

91  

92  
    select_scheduler& sched_;
92  
    select_scheduler& sched_;
93  
    std::mutex mutex_;
93  
    std::mutex mutex_;
94  
    intrusive_list<select_socket> socket_list_;
94  
    intrusive_list<select_socket> socket_list_;
95  
    std::unordered_map<select_socket*, std::shared_ptr<select_socket>>
95  
    std::unordered_map<select_socket*, std::shared_ptr<select_socket>>
96  
        socket_ptrs_;
96  
        socket_ptrs_;
97  
};
97  
};
98  

98  

99  
/** select socket service implementation.
99  
/** select socket service implementation.
100  

100  

101  
    Inherits from socket_service to enable runtime polymorphism.
101  
    Inherits from socket_service to enable runtime polymorphism.
102  
    Uses key_type = socket_service for service lookup.
102  
    Uses key_type = socket_service for service lookup.
103  
*/
103  
*/
104  
class BOOST_COROSIO_DECL select_socket_service final : public socket_service
104  
class BOOST_COROSIO_DECL select_socket_service final : public socket_service
105  
{
105  
{
106  
public:
106  
public:
107  
    explicit select_socket_service(capy::execution_context& ctx);
107  
    explicit select_socket_service(capy::execution_context& ctx);
108  
    ~select_socket_service() override;
108  
    ~select_socket_service() override;
109  

109  

110  
    select_socket_service(select_socket_service const&)            = delete;
110  
    select_socket_service(select_socket_service const&)            = delete;
111  
    select_socket_service& operator=(select_socket_service const&) = delete;
111  
    select_socket_service& operator=(select_socket_service const&) = delete;
112  

112  

113  
    void shutdown() override;
113  
    void shutdown() override;
114  

114  

115  
    io_object::implementation* construct() override;
115  
    io_object::implementation* construct() override;
116  
    void destroy(io_object::implementation*) override;
116  
    void destroy(io_object::implementation*) override;
117  
    void close(io_object::handle&) override;
117  
    void close(io_object::handle&) override;
118 -
    std::error_code
118 +
    std::error_code open_socket(
119 -
    open_socket(tcp_socket::implementation& impl,
119 +
        tcp_socket::implementation& impl,
120 -
                int family, int type, int protocol) override;
120 +
        int family,
 
121 +
        int type,
 
122 +
        int protocol) override;
121  

123  

122  
    select_scheduler& scheduler() const noexcept
124  
    select_scheduler& scheduler() const noexcept
123  
    {
125  
    {
124  
        return state_->sched_;
126  
        return state_->sched_;
125  
    }
127  
    }
126  
    void post(select_op* op);
128  
    void post(select_op* op);
127  
    void work_started() noexcept;
129  
    void work_started() noexcept;
128  
    void work_finished() noexcept;
130  
    void work_finished() noexcept;
129  

131  

130  
private:
132  
private:
131  
    std::unique_ptr<select_socket_state> state_;
133  
    std::unique_ptr<select_socket_state> state_;
132  
};
134  
};
133  

135  

134  
// Backward compatibility alias
136  
// Backward compatibility alias
135  
using select_sockets = select_socket_service;
137  
using select_sockets = select_socket_service;
136  

138  

137  
inline void
139  
inline void
138  
select_op::canceller::operator()() const noexcept
140  
select_op::canceller::operator()() const noexcept
139  
{
141  
{
140  
    op->cancel();
142  
    op->cancel();
141  
}
143  
}
142  

144  

143  
inline void
145  
inline void
144  
select_connect_op::cancel() noexcept
146  
select_connect_op::cancel() noexcept
145  
{
147  
{
146  
    if (socket_impl_)
148  
    if (socket_impl_)
147  
        socket_impl_->cancel_single_op(*this);
149  
        socket_impl_->cancel_single_op(*this);
148  
    else
150  
    else
149  
        request_cancel();
151  
        request_cancel();
150  
}
152  
}
151  

153  

152  
inline void
154  
inline void
153  
select_read_op::cancel() noexcept
155  
select_read_op::cancel() noexcept
154  
{
156  
{
155  
    if (socket_impl_)
157  
    if (socket_impl_)
156  
        socket_impl_->cancel_single_op(*this);
158  
        socket_impl_->cancel_single_op(*this);
157  
    else
159  
    else
158  
        request_cancel();
160  
        request_cancel();
159  
}
161  
}
160  

162  

161  
inline void
163  
inline void
162  
select_write_op::cancel() noexcept
164  
select_write_op::cancel() noexcept
163  
{
165  
{
164  
    if (socket_impl_)
166  
    if (socket_impl_)
165  
        socket_impl_->cancel_single_op(*this);
167  
        socket_impl_->cancel_single_op(*this);
166  
    else
168  
    else
167  
        request_cancel();
169  
        request_cancel();
168  
}
170  
}
169  

171  

170  
inline void
172  
inline void
171  
select_connect_op::operator()()
173  
select_connect_op::operator()()
172  
{
174  
{
173  
    stop_cb.reset();
175  
    stop_cb.reset();
174  

176  

175  
    bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
177  
    bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
176  

178  

177  
    // Cache endpoints on successful connect
179  
    // Cache endpoints on successful connect
178  
    if (success && socket_impl_)
180  
    if (success && socket_impl_)
179  
    {
181  
    {
180  
        endpoint local_ep;
182  
        endpoint local_ep;
181  
        sockaddr_storage local_storage{};
183  
        sockaddr_storage local_storage{};
182  
        socklen_t local_len = sizeof(local_storage);
184  
        socklen_t local_len = sizeof(local_storage);
183  
        if (::getsockname(
185  
        if (::getsockname(
184 -
                fd, reinterpret_cast<sockaddr*>(&local_storage),
186 +
                fd, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
185 -
                &local_len) == 0)
187 +
            0)
186  
            local_ep = from_sockaddr(local_storage);
188  
            local_ep = from_sockaddr(local_storage);
187  
        static_cast<select_socket*>(socket_impl_)
189  
        static_cast<select_socket*>(socket_impl_)
188  
            ->set_endpoints(local_ep, target_endpoint);
190  
            ->set_endpoints(local_ep, target_endpoint);
189  
    }
191  
    }
190  

192  

191  
    if (ec_out)
193  
    if (ec_out)
192  
    {
194  
    {
193  
        if (cancelled.load(std::memory_order_acquire))
195  
        if (cancelled.load(std::memory_order_acquire))
194  
            *ec_out = capy::error::canceled;
196  
            *ec_out = capy::error::canceled;
195  
        else if (errn != 0)
197  
        else if (errn != 0)
196  
            *ec_out = make_err(errn);
198  
            *ec_out = make_err(errn);
197  
        else
199  
        else
198  
            *ec_out = {};
200  
            *ec_out = {};
199  
    }
201  
    }
200  

202  

201  
    if (bytes_out)
203  
    if (bytes_out)
202  
        *bytes_out = bytes_transferred;
204  
        *bytes_out = bytes_transferred;
203  

205  

204  
    // Move to stack before destroying the frame
206  
    // Move to stack before destroying the frame
205  
    capy::executor_ref saved_ex(ex);
207  
    capy::executor_ref saved_ex(ex);
206  
    std::coroutine_handle<> saved_h(h);
208  
    std::coroutine_handle<> saved_h(h);
207  
    impl_ptr.reset();
209  
    impl_ptr.reset();
208  
    dispatch_coro(saved_ex, saved_h).resume();
210  
    dispatch_coro(saved_ex, saved_h).resume();
209  
}
211  
}
210  

212  

211  
inline select_socket::select_socket(select_socket_service& svc) noexcept
213  
inline select_socket::select_socket(select_socket_service& svc) noexcept
212  
    : svc_(svc)
214  
    : svc_(svc)
213  
{
215  
{
214  
}
216  
}
215  

217  

216  
inline std::coroutine_handle<>
218  
inline std::coroutine_handle<>
217  
select_socket::connect(
219  
select_socket::connect(
218  
    std::coroutine_handle<> h,
220  
    std::coroutine_handle<> h,
219  
    capy::executor_ref ex,
221  
    capy::executor_ref ex,
220  
    endpoint ep,
222  
    endpoint ep,
221  
    std::stop_token token,
223  
    std::stop_token token,
222  
    std::error_code* ec)
224  
    std::error_code* ec)
223  
{
225  
{
224  
    auto& op = conn_;
226  
    auto& op = conn_;
225  
    op.reset();
227  
    op.reset();
226  
    op.h               = h;
228  
    op.h               = h;
227  
    op.ex              = ex;
229  
    op.ex              = ex;
228  
    op.ec_out          = ec;
230  
    op.ec_out          = ec;
229  
    op.fd              = fd_;
231  
    op.fd              = fd_;
230  
    op.target_endpoint = ep; // Store target for endpoint caching
232  
    op.target_endpoint = ep; // Store target for endpoint caching
231  
    op.start(token, this);
233  
    op.start(token, this);
232  

234  

233  
    sockaddr_storage storage{};
235  
    sockaddr_storage storage{};
234  
    socklen_t addrlen =
236  
    socklen_t addrlen =
235  
        detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
237  
        detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
236 -
    int result =
238 +
    int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
237 -
        ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
 
238  

239  

239  
    if (result == 0)
240  
    if (result == 0)
240  
    {
241  
    {
241  
        // Sync success — cache endpoints immediately
242  
        // Sync success — cache endpoints immediately
242  
        sockaddr_storage local_storage{};
243  
        sockaddr_storage local_storage{};
243  
        socklen_t local_len = sizeof(local_storage);
244  
        socklen_t local_len = sizeof(local_storage);
244  
        if (::getsockname(
245  
        if (::getsockname(
245 -
                fd_, reinterpret_cast<sockaddr*>(&local_storage),
246 +
                fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
246 -
                &local_len) == 0)
247 +
            0)
247  
            local_endpoint_ = detail::from_sockaddr(local_storage);
248  
            local_endpoint_ = detail::from_sockaddr(local_storage);
248  
        remote_endpoint_ = ep;
249  
        remote_endpoint_ = ep;
249  

250  

250  
        op.complete(0, 0);
251  
        op.complete(0, 0);
251  
        op.impl_ptr = shared_from_this();
252  
        op.impl_ptr = shared_from_this();
252  
        svc_.post(&op);
253  
        svc_.post(&op);
253  
        // completion is always posted to scheduler queue, never inline.
254  
        // completion is always posted to scheduler queue, never inline.
254  
        return std::noop_coroutine();
255  
        return std::noop_coroutine();
255  
    }
256  
    }
256  

257  

257  
    if (errno == EINPROGRESS)
258  
    if (errno == EINPROGRESS)
258  
    {
259  
    {
259  
        svc_.work_started();
260  
        svc_.work_started();
260  
        op.impl_ptr = shared_from_this();
261  
        op.impl_ptr = shared_from_this();
261  

262  

262  
        // Set registering BEFORE register_fd to close the race window where
263  
        // Set registering BEFORE register_fd to close the race window where
263  
        // reactor sees an event before we set registered. The reactor treats
264  
        // reactor sees an event before we set registered. The reactor treats
264  
        // registering the same as registered when claiming the op.
265  
        // registering the same as registered when claiming the op.
265  
        op.registered.store(
266  
        op.registered.store(
266  
            select_registration_state::registering, std::memory_order_release);
267  
            select_registration_state::registering, std::memory_order_release);
267  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
268  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
268  

269  

269  
        // Transition to registered. If this fails, reactor or cancel already
270  
        // Transition to registered. If this fails, reactor or cancel already
270  
        // claimed the op (state is now unregistered), so we're done. However,
271  
        // claimed the op (state is now unregistered), so we're done. However,
271  
        // we must still deregister the fd because cancel's deregister_fd may
272  
        // we must still deregister the fd because cancel's deregister_fd may
272  
        // have run before our register_fd, leaving the fd orphaned.
273  
        // have run before our register_fd, leaving the fd orphaned.
273  
        auto expected = select_registration_state::registering;
274  
        auto expected = select_registration_state::registering;
274  
        if (!op.registered.compare_exchange_strong(
275  
        if (!op.registered.compare_exchange_strong(
275  
                expected, select_registration_state::registered,
276  
                expected, select_registration_state::registered,
276  
                std::memory_order_acq_rel))
277  
                std::memory_order_acq_rel))
277  
        {
278  
        {
278  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
279  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
279  
            // completion is always posted to scheduler queue, never inline.
280  
            // completion is always posted to scheduler queue, never inline.
280  
            return std::noop_coroutine();
281  
            return std::noop_coroutine();
281  
        }
282  
        }
282  

283  

283  
        // If cancelled was set before we registered, handle it now.
284  
        // If cancelled was set before we registered, handle it now.
284  
        if (op.cancelled.load(std::memory_order_acquire))
285  
        if (op.cancelled.load(std::memory_order_acquire))
285  
        {
286  
        {
286  
            auto prev = op.registered.exchange(
287  
            auto prev = op.registered.exchange(
287  
                select_registration_state::unregistered,
288  
                select_registration_state::unregistered,
288  
                std::memory_order_acq_rel);
289  
                std::memory_order_acq_rel);
289  
            if (prev != select_registration_state::unregistered)
290  
            if (prev != select_registration_state::unregistered)
290  
            {
291  
            {
291  
                svc_.scheduler().deregister_fd(
292  
                svc_.scheduler().deregister_fd(
292  
                    fd_, select_scheduler::event_write);
293  
                    fd_, select_scheduler::event_write);
293  
                op.impl_ptr = shared_from_this();
294  
                op.impl_ptr = shared_from_this();
294  
                svc_.post(&op);
295  
                svc_.post(&op);
295  
                svc_.work_finished();
296  
                svc_.work_finished();
296  
            }
297  
            }
297  
        }
298  
        }
298  
        // completion is always posted to scheduler queue, never inline.
299  
        // completion is always posted to scheduler queue, never inline.
299  
        return std::noop_coroutine();
300  
        return std::noop_coroutine();
300  
    }
301  
    }
301  

302  

302  
    op.complete(errno, 0);
303  
    op.complete(errno, 0);
303  
    op.impl_ptr = shared_from_this();
304  
    op.impl_ptr = shared_from_this();
304  
    svc_.post(&op);
305  
    svc_.post(&op);
305  
    // completion is always posted to scheduler queue, never inline.
306  
    // completion is always posted to scheduler queue, never inline.
306  
    return std::noop_coroutine();
307  
    return std::noop_coroutine();
307  
}
308  
}
308  

309  

309  
inline std::coroutine_handle<>
310  
inline std::coroutine_handle<>
310  
select_socket::read_some(
311  
select_socket::read_some(
311  
    std::coroutine_handle<> h,
312  
    std::coroutine_handle<> h,
312  
    capy::executor_ref ex,
313  
    capy::executor_ref ex,
313 -
    io_buffer_param param,
314 +
    buffer_param param,
314  
    std::stop_token token,
315  
    std::stop_token token,
315  
    std::error_code* ec,
316  
    std::error_code* ec,
316  
    std::size_t* bytes_out)
317  
    std::size_t* bytes_out)
317  
{
318  
{
318  
    auto& op = rd_;
319  
    auto& op = rd_;
319  
    op.reset();
320  
    op.reset();
320  
    op.h         = h;
321  
    op.h         = h;
321  
    op.ex        = ex;
322  
    op.ex        = ex;
322  
    op.ec_out    = ec;
323  
    op.ec_out    = ec;
323  
    op.bytes_out = bytes_out;
324  
    op.bytes_out = bytes_out;
324  
    op.fd        = fd_;
325  
    op.fd        = fd_;
325  
    op.start(token, this);
326  
    op.start(token, this);
326  

327  

327  
    capy::mutable_buffer bufs[select_read_op::max_buffers];
328  
    capy::mutable_buffer bufs[select_read_op::max_buffers];
328  
    op.iovec_count =
329  
    op.iovec_count =
329  
        static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
330  
        static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
330  

331  

331  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
332  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
332  
    {
333  
    {
333  
        op.empty_buffer_read = true;
334  
        op.empty_buffer_read = true;
334  
        op.complete(0, 0);
335  
        op.complete(0, 0);
335  
        op.impl_ptr = shared_from_this();
336  
        op.impl_ptr = shared_from_this();
336  
        svc_.post(&op);
337  
        svc_.post(&op);
337  
        return std::noop_coroutine();
338  
        return std::noop_coroutine();
338  
    }
339  
    }
339  

340  

340  
    for (int i = 0; i < op.iovec_count; ++i)
341  
    for (int i = 0; i < op.iovec_count; ++i)
341  
    {
342  
    {
342  
        op.iovecs[i].iov_base = bufs[i].data();
343  
        op.iovecs[i].iov_base = bufs[i].data();
343  
        op.iovecs[i].iov_len  = bufs[i].size();
344  
        op.iovecs[i].iov_len  = bufs[i].size();
344  
    }
345  
    }
345  

346  

346  
    ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
347  
    ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
347  

348  

348  
    if (n > 0)
349  
    if (n > 0)
349  
    {
350  
    {
350  
        op.complete(0, static_cast<std::size_t>(n));
351  
        op.complete(0, static_cast<std::size_t>(n));
351  
        op.impl_ptr = shared_from_this();
352  
        op.impl_ptr = shared_from_this();
352  
        svc_.post(&op);
353  
        svc_.post(&op);
353  
        return std::noop_coroutine();
354  
        return std::noop_coroutine();
354  
    }
355  
    }
355  

356  

356  
    if (n == 0)
357  
    if (n == 0)
357  
    {
358  
    {
358  
        op.complete(0, 0);
359  
        op.complete(0, 0);
359  
        op.impl_ptr = shared_from_this();
360  
        op.impl_ptr = shared_from_this();
360  
        svc_.post(&op);
361  
        svc_.post(&op);
361  
        return std::noop_coroutine();
362  
        return std::noop_coroutine();
362  
    }
363  
    }
363  

364  

364  
    if (errno == EAGAIN || errno == EWOULDBLOCK)
365  
    if (errno == EAGAIN || errno == EWOULDBLOCK)
365  
    {
366  
    {
366  
        svc_.work_started();
367  
        svc_.work_started();
367  
        op.impl_ptr = shared_from_this();
368  
        op.impl_ptr = shared_from_this();
368  

369  

369  
        // Set registering BEFORE register_fd to close the race window where
370  
        // Set registering BEFORE register_fd to close the race window where
370  
        // reactor sees an event before we set registered.
371  
        // reactor sees an event before we set registered.
371  
        op.registered.store(
372  
        op.registered.store(
372  
            select_registration_state::registering, std::memory_order_release);
373  
            select_registration_state::registering, std::memory_order_release);
373  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
374  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
374  

375  

375  
        // Transition to registered. If this fails, reactor or cancel already
376  
        // Transition to registered. If this fails, reactor or cancel already
376  
        // claimed the op (state is now unregistered), so we're done. However,
377  
        // claimed the op (state is now unregistered), so we're done. However,
377  
        // we must still deregister the fd because cancel's deregister_fd may
378  
        // we must still deregister the fd because cancel's deregister_fd may
378  
        // have run before our register_fd, leaving the fd orphaned.
379  
        // have run before our register_fd, leaving the fd orphaned.
379  
        auto expected = select_registration_state::registering;
380  
        auto expected = select_registration_state::registering;
380  
        if (!op.registered.compare_exchange_strong(
381  
        if (!op.registered.compare_exchange_strong(
381  
                expected, select_registration_state::registered,
382  
                expected, select_registration_state::registered,
382  
                std::memory_order_acq_rel))
383  
                std::memory_order_acq_rel))
383  
        {
384  
        {
384  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
385  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
385  
            return std::noop_coroutine();
386  
            return std::noop_coroutine();
386  
        }
387  
        }
387  

388  

388  
        // If cancelled was set before we registered, handle it now.
389  
        // If cancelled was set before we registered, handle it now.
389  
        if (op.cancelled.load(std::memory_order_acquire))
390  
        if (op.cancelled.load(std::memory_order_acquire))
390  
        {
391  
        {
391  
            auto prev = op.registered.exchange(
392  
            auto prev = op.registered.exchange(
392  
                select_registration_state::unregistered,
393  
                select_registration_state::unregistered,
393  
                std::memory_order_acq_rel);
394  
                std::memory_order_acq_rel);
394  
            if (prev != select_registration_state::unregistered)
395  
            if (prev != select_registration_state::unregistered)
395  
            {
396  
            {
396  
                svc_.scheduler().deregister_fd(
397  
                svc_.scheduler().deregister_fd(
397  
                    fd_, select_scheduler::event_read);
398  
                    fd_, select_scheduler::event_read);
398  
                op.impl_ptr = shared_from_this();
399  
                op.impl_ptr = shared_from_this();
399  
                svc_.post(&op);
400  
                svc_.post(&op);
400  
                svc_.work_finished();
401  
                svc_.work_finished();
401  
            }
402  
            }
402  
        }
403  
        }
403  
        return std::noop_coroutine();
404  
        return std::noop_coroutine();
404  
    }
405  
    }
405  

406  

406  
    op.complete(errno, 0);
407  
    op.complete(errno, 0);
407  
    op.impl_ptr = shared_from_this();
408  
    op.impl_ptr = shared_from_this();
408  
    svc_.post(&op);
409  
    svc_.post(&op);
409  
    return std::noop_coroutine();
410  
    return std::noop_coroutine();
410  
}
411  
}
411  

412  

412  
inline std::coroutine_handle<>
413  
inline std::coroutine_handle<>
413  
select_socket::write_some(
414  
select_socket::write_some(
414  
    std::coroutine_handle<> h,
415  
    std::coroutine_handle<> h,
415  
    capy::executor_ref ex,
416  
    capy::executor_ref ex,
416 -
    io_buffer_param param,
417 +
    buffer_param param,
417  
    std::stop_token token,
418  
    std::stop_token token,
418  
    std::error_code* ec,
419  
    std::error_code* ec,
419  
    std::size_t* bytes_out)
420  
    std::size_t* bytes_out)
420  
{
421  
{
421  
    auto& op = wr_;
422  
    auto& op = wr_;
422  
    op.reset();
423  
    op.reset();
423  
    op.h         = h;
424  
    op.h         = h;
424  
    op.ex        = ex;
425  
    op.ex        = ex;
425  
    op.ec_out    = ec;
426  
    op.ec_out    = ec;
426  
    op.bytes_out = bytes_out;
427  
    op.bytes_out = bytes_out;
427  
    op.fd        = fd_;
428  
    op.fd        = fd_;
428  
    op.start(token, this);
429  
    op.start(token, this);
429  

430  

430  
    capy::mutable_buffer bufs[select_write_op::max_buffers];
431  
    capy::mutable_buffer bufs[select_write_op::max_buffers];
431  
    op.iovec_count =
432  
    op.iovec_count =
432  
        static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
433  
        static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
433  

434  

434  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
435  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
435  
    {
436  
    {
436  
        op.complete(0, 0);
437  
        op.complete(0, 0);
437  
        op.impl_ptr = shared_from_this();
438  
        op.impl_ptr = shared_from_this();
438  
        svc_.post(&op);
439  
        svc_.post(&op);
439  
        return std::noop_coroutine();
440  
        return std::noop_coroutine();
440  
    }
441  
    }
441  

442  

442  
    for (int i = 0; i < op.iovec_count; ++i)
443  
    for (int i = 0; i < op.iovec_count; ++i)
443  
    {
444  
    {
444  
        op.iovecs[i].iov_base = bufs[i].data();
445  
        op.iovecs[i].iov_base = bufs[i].data();
445  
        op.iovecs[i].iov_len  = bufs[i].size();
446  
        op.iovecs[i].iov_len  = bufs[i].size();
446  
    }
447  
    }
447  

448  

448  
    msghdr msg{};
449  
    msghdr msg{};
449  
    msg.msg_iov    = op.iovecs;
450  
    msg.msg_iov    = op.iovecs;
450  
    msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
451  
    msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
451  

452  

452  
    ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
453  
    ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
453  

454  

454  
    if (n > 0)
455  
    if (n > 0)
455  
    {
456  
    {
456  
        op.complete(0, static_cast<std::size_t>(n));
457  
        op.complete(0, static_cast<std::size_t>(n));
457  
        op.impl_ptr = shared_from_this();
458  
        op.impl_ptr = shared_from_this();
458  
        svc_.post(&op);
459  
        svc_.post(&op);
459  
        return std::noop_coroutine();
460  
        return std::noop_coroutine();
460  
    }
461  
    }
461  

462  

462  
    if (errno == EAGAIN || errno == EWOULDBLOCK)
463  
    if (errno == EAGAIN || errno == EWOULDBLOCK)
463  
    {
464  
    {
464  
        svc_.work_started();
465  
        svc_.work_started();
465  
        op.impl_ptr = shared_from_this();
466  
        op.impl_ptr = shared_from_this();
466  

467  

467  
        // Set registering BEFORE register_fd to close the race window where
468  
        // Set registering BEFORE register_fd to close the race window where
468  
        // reactor sees an event before we set registered.
469  
        // reactor sees an event before we set registered.
469  
        op.registered.store(
470  
        op.registered.store(
470  
            select_registration_state::registering, std::memory_order_release);
471  
            select_registration_state::registering, std::memory_order_release);
471  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
472  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
472  

473  

473  
        // Transition to registered. If this fails, reactor or cancel already
474  
        // Transition to registered. If this fails, reactor or cancel already
474  
        // claimed the op (state is now unregistered), so we're done. However,
475  
        // claimed the op (state is now unregistered), so we're done. However,
475  
        // we must still deregister the fd because cancel's deregister_fd may
476  
        // we must still deregister the fd because cancel's deregister_fd may
476  
        // have run before our register_fd, leaving the fd orphaned.
477  
        // have run before our register_fd, leaving the fd orphaned.
477  
        auto expected = select_registration_state::registering;
478  
        auto expected = select_registration_state::registering;
478  
        if (!op.registered.compare_exchange_strong(
479  
        if (!op.registered.compare_exchange_strong(
479  
                expected, select_registration_state::registered,
480  
                expected, select_registration_state::registered,
480  
                std::memory_order_acq_rel))
481  
                std::memory_order_acq_rel))
481  
        {
482  
        {
482  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
483  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
483  
            return std::noop_coroutine();
484  
            return std::noop_coroutine();
484  
        }
485  
        }
485  

486  

486  
        // If cancelled was set before we registered, handle it now.
487  
        // If cancelled was set before we registered, handle it now.
487  
        if (op.cancelled.load(std::memory_order_acquire))
488  
        if (op.cancelled.load(std::memory_order_acquire))
488  
        {
489  
        {
489  
            auto prev = op.registered.exchange(
490  
            auto prev = op.registered.exchange(
490  
                select_registration_state::unregistered,
491  
                select_registration_state::unregistered,
491  
                std::memory_order_acq_rel);
492  
                std::memory_order_acq_rel);
492  
            if (prev != select_registration_state::unregistered)
493  
            if (prev != select_registration_state::unregistered)
493  
            {
494  
            {
494  
                svc_.scheduler().deregister_fd(
495  
                svc_.scheduler().deregister_fd(
495  
                    fd_, select_scheduler::event_write);
496  
                    fd_, select_scheduler::event_write);
496  
                op.impl_ptr = shared_from_this();
497  
                op.impl_ptr = shared_from_this();
497  
                svc_.post(&op);
498  
                svc_.post(&op);
498  
                svc_.work_finished();
499  
                svc_.work_finished();
499  
            }
500  
            }
500  
        }
501  
        }
501  
        return std::noop_coroutine();
502  
        return std::noop_coroutine();
502  
    }
503  
    }
503  

504  

504  
    op.complete(errno ? errno : EIO, 0);
505  
    op.complete(errno ? errno : EIO, 0);
505  
    op.impl_ptr = shared_from_this();
506  
    op.impl_ptr = shared_from_this();
506  
    svc_.post(&op);
507  
    svc_.post(&op);
507  
    return std::noop_coroutine();
508  
    return std::noop_coroutine();
508  
}
509  
}
509  

510  

510  
inline std::error_code
511  
inline std::error_code
511  
select_socket::shutdown(tcp_socket::shutdown_type what) noexcept
512  
select_socket::shutdown(tcp_socket::shutdown_type what) noexcept
512  
{
513  
{
513  
    int how;
514  
    int how;
514  
    switch (what)
515  
    switch (what)
515  
    {
516  
    {
516  
    case tcp_socket::shutdown_receive:
517  
    case tcp_socket::shutdown_receive:
517  
        how = SHUT_RD;
518  
        how = SHUT_RD;
518  
        break;
519  
        break;
519  
    case tcp_socket::shutdown_send:
520  
    case tcp_socket::shutdown_send:
520  
        how = SHUT_WR;
521  
        how = SHUT_WR;
521  
        break;
522  
        break;
522  
    case tcp_socket::shutdown_both:
523  
    case tcp_socket::shutdown_both:
523  
        how = SHUT_RDWR;
524  
        how = SHUT_RDWR;
524  
        break;
525  
        break;
525  
    default:
526  
    default:
526  
        return make_err(EINVAL);
527  
        return make_err(EINVAL);
527  
    }
528  
    }
528  
    if (::shutdown(fd_, how) != 0)
529  
    if (::shutdown(fd_, how) != 0)
529  
        return make_err(errno);
530  
        return make_err(errno);
530  
    return {};
531  
    return {};
531  
}
532  
}
532  

533  

533  
inline std::error_code
534  
inline std::error_code
534  
select_socket::set_option(
535  
select_socket::set_option(
535 -
    int level, int optname,
536 +
    int level, int optname, void const* data, std::size_t size) noexcept
536 -
    void const* data, std::size_t size) noexcept
 
537  
{
537  
{
538 -
    if (::setsockopt(fd_, level, optname, data,
538 +
    if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) !=
539 -
            static_cast<socklen_t>(size)) != 0)
539 +
        0)
540  
        return make_err(errno);
540  
        return make_err(errno);
541  
    return {};
541  
    return {};
542  
}
542  
}
543  

543  

544  
inline std::error_code
544  
inline std::error_code
545  
select_socket::get_option(
545  
select_socket::get_option(
546 -
    int level, int optname,
546 +
    int level, int optname, void* data, std::size_t* size) const noexcept
547 -
    void* data, std::size_t* size) const noexcept
 
548  
{
547  
{
549  
    socklen_t len = static_cast<socklen_t>(*size);
548  
    socklen_t len = static_cast<socklen_t>(*size);
550  
    if (::getsockopt(fd_, level, optname, data, &len) != 0)
549  
    if (::getsockopt(fd_, level, optname, data, &len) != 0)
551  
        return make_err(errno);
550  
        return make_err(errno);
552  
    *size = static_cast<std::size_t>(len);
551  
    *size = static_cast<std::size_t>(len);
553  
    return {};
552  
    return {};
554  
}
553  
}
555  

554  

556  
inline void
555  
inline void
557  
select_socket::cancel() noexcept
556  
select_socket::cancel() noexcept
558  
{
557  
{
559  
    auto self = weak_from_this().lock();
558  
    auto self = weak_from_this().lock();
560  
    if (!self)
559  
    if (!self)
561  
        return;
560  
        return;
562  

561  

563  
    auto cancel_op = [this, &self](select_op& op, int events) {
562  
    auto cancel_op = [this, &self](select_op& op, int events) {
564  
        auto prev = op.registered.exchange(
563  
        auto prev = op.registered.exchange(
565  
            select_registration_state::unregistered, std::memory_order_acq_rel);
564  
            select_registration_state::unregistered, std::memory_order_acq_rel);
566  
        op.request_cancel();
565  
        op.request_cancel();
567  
        if (prev != select_registration_state::unregistered)
566  
        if (prev != select_registration_state::unregistered)
568  
        {
567  
        {
569  
            svc_.scheduler().deregister_fd(fd_, events);
568  
            svc_.scheduler().deregister_fd(fd_, events);
570  
            op.impl_ptr = self;
569  
            op.impl_ptr = self;
571  
            svc_.post(&op);
570  
            svc_.post(&op);
572  
            svc_.work_finished();
571  
            svc_.work_finished();
573  
        }
572  
        }
574  
    };
573  
    };
575  

574  

576  
    cancel_op(conn_, select_scheduler::event_write);
575  
    cancel_op(conn_, select_scheduler::event_write);
577  
    cancel_op(rd_, select_scheduler::event_read);
576  
    cancel_op(rd_, select_scheduler::event_read);
578  
    cancel_op(wr_, select_scheduler::event_write);
577  
    cancel_op(wr_, select_scheduler::event_write);
579  
}
578  
}
580  

579  

581  
inline void
580  
inline void
582  
select_socket::cancel_single_op(select_op& op) noexcept
581  
select_socket::cancel_single_op(select_op& op) noexcept
583  
{
582  
{
584  
    auto self = weak_from_this().lock();
583  
    auto self = weak_from_this().lock();
585  
    if (!self)
584  
    if (!self)
586  
        return;
585  
        return;
587  

586  

588  
    // Called from stop_token callback to cancel a specific pending operation.
587  
    // Called from stop_token callback to cancel a specific pending operation.
589  
    auto prev = op.registered.exchange(
588  
    auto prev = op.registered.exchange(
590  
        select_registration_state::unregistered, std::memory_order_acq_rel);
589  
        select_registration_state::unregistered, std::memory_order_acq_rel);
591  
    op.request_cancel();
590  
    op.request_cancel();
592  

591  

593  
    if (prev != select_registration_state::unregistered)
592  
    if (prev != select_registration_state::unregistered)
594  
    {
593  
    {
595  
        // Determine which event type to deregister
594  
        // Determine which event type to deregister
596  
        int events = 0;
595  
        int events = 0;
597  
        if (&op == &conn_ || &op == &wr_)
596  
        if (&op == &conn_ || &op == &wr_)
598  
            events = select_scheduler::event_write;
597  
            events = select_scheduler::event_write;
599  
        else if (&op == &rd_)
598  
        else if (&op == &rd_)
600  
            events = select_scheduler::event_read;
599  
            events = select_scheduler::event_read;
601  

600  

602  
        svc_.scheduler().deregister_fd(fd_, events);
601  
        svc_.scheduler().deregister_fd(fd_, events);
603  

602  

604  
        op.impl_ptr = self;
603  
        op.impl_ptr = self;
605  
        svc_.post(&op);
604  
        svc_.post(&op);
606  
        svc_.work_finished();
605  
        svc_.work_finished();
607  
    }
606  
    }
608  
}
607  
}
609  

608  

610  
inline void
609  
inline void
611  
select_socket::close_socket() noexcept
610  
select_socket::close_socket() noexcept
612  
{
611  
{
613  
    auto self = weak_from_this().lock();
612  
    auto self = weak_from_this().lock();
614  
    if (self)
613  
    if (self)
615  
    {
614  
    {
616  
        auto cancel_op = [this, &self](select_op& op, int events) {
615  
        auto cancel_op = [this, &self](select_op& op, int events) {
617  
            auto prev = op.registered.exchange(
616  
            auto prev = op.registered.exchange(
618  
                select_registration_state::unregistered,
617  
                select_registration_state::unregistered,
619  
                std::memory_order_acq_rel);
618  
                std::memory_order_acq_rel);
620  
            op.request_cancel();
619  
            op.request_cancel();
621  
            if (prev != select_registration_state::unregistered)
620  
            if (prev != select_registration_state::unregistered)
622  
            {
621  
            {
623  
                svc_.scheduler().deregister_fd(fd_, events);
622  
                svc_.scheduler().deregister_fd(fd_, events);
624  
                op.impl_ptr = self;
623  
                op.impl_ptr = self;
625  
                svc_.post(&op);
624  
                svc_.post(&op);
626  
                svc_.work_finished();
625  
                svc_.work_finished();
627  
            }
626  
            }
628  
        };
627  
        };
629  

628  

630  
        cancel_op(conn_, select_scheduler::event_write);
629  
        cancel_op(conn_, select_scheduler::event_write);
631  
        cancel_op(rd_, select_scheduler::event_read);
630  
        cancel_op(rd_, select_scheduler::event_read);
632  
        cancel_op(wr_, select_scheduler::event_write);
631  
        cancel_op(wr_, select_scheduler::event_write);
633  
    }
632  
    }
634  

633  

635  
    if (fd_ >= 0)
634  
    if (fd_ >= 0)
636  
    {
635  
    {
637  
        svc_.scheduler().deregister_fd(
636  
        svc_.scheduler().deregister_fd(
638  
            fd_, select_scheduler::event_read | select_scheduler::event_write);
637  
            fd_, select_scheduler::event_read | select_scheduler::event_write);
639  
        ::close(fd_);
638  
        ::close(fd_);
640  
        fd_ = -1;
639  
        fd_ = -1;
641  
    }
640  
    }
642  

641  

643  
    local_endpoint_  = endpoint{};
642  
    local_endpoint_  = endpoint{};
644  
    remote_endpoint_ = endpoint{};
643  
    remote_endpoint_ = endpoint{};
645  
}
644  
}
646  

645  

647  
inline select_socket_service::select_socket_service(
646  
inline select_socket_service::select_socket_service(
648  
    capy::execution_context& ctx)
647  
    capy::execution_context& ctx)
649  
    : state_(
648  
    : state_(
650  
          std::make_unique<select_socket_state>(
649  
          std::make_unique<select_socket_state>(
651  
              ctx.use_service<select_scheduler>()))
650  
              ctx.use_service<select_scheduler>()))
652  
{
651  
{
653  
}
652  
}
654  

653  

655  
inline select_socket_service::~select_socket_service() {}
654  
inline select_socket_service::~select_socket_service() {}
656  

655  

657  
inline void
656  
inline void
658  
select_socket_service::shutdown()
657  
select_socket_service::shutdown()
659  
{
658  
{
660  
    std::lock_guard lock(state_->mutex_);
659  
    std::lock_guard lock(state_->mutex_);
661  

660  

662  
    while (auto* impl = state_->socket_list_.pop_front())
661  
    while (auto* impl = state_->socket_list_.pop_front())
663  
        impl->close_socket();
662  
        impl->close_socket();
664  

663  

665  
    // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
664  
    // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
666  
    // drains completed_ops_, calling destroy() on each queued op. Letting
665  
    // drains completed_ops_, calling destroy() on each queued op. Letting
667  
    // ~state_ release the ptrs (during service destruction, after scheduler
666  
    // ~state_ release the ptrs (during service destruction, after scheduler
668  
    // shutdown) keeps every impl alive until all ops have been drained.
667  
    // shutdown) keeps every impl alive until all ops have been drained.
669  
}
668  
}
670  

669  

671  
inline io_object::implementation*
670  
inline io_object::implementation*
672  
select_socket_service::construct()
671  
select_socket_service::construct()
673  
{
672  
{
674  
    auto impl = std::make_shared<select_socket>(*this);
673  
    auto impl = std::make_shared<select_socket>(*this);
675  
    auto* raw = impl.get();
674  
    auto* raw = impl.get();
676  

675  

677  
    {
676  
    {
678  
        std::lock_guard lock(state_->mutex_);
677  
        std::lock_guard lock(state_->mutex_);
679  
        state_->socket_list_.push_back(raw);
678  
        state_->socket_list_.push_back(raw);
680  
        state_->socket_ptrs_.emplace(raw, std::move(impl));
679  
        state_->socket_ptrs_.emplace(raw, std::move(impl));
681  
    }
680  
    }
682  

681  

683  
    return raw;
682  
    return raw;
684  
}
683  
}
685  

684  

686  
inline void
685  
inline void
687  
select_socket_service::destroy(io_object::implementation* impl)
686  
select_socket_service::destroy(io_object::implementation* impl)
688  
{
687  
{
689  
    auto* select_impl = static_cast<select_socket*>(impl);
688  
    auto* select_impl = static_cast<select_socket*>(impl);
690  
    select_impl->close_socket();
689  
    select_impl->close_socket();
691  
    std::lock_guard lock(state_->mutex_);
690  
    std::lock_guard lock(state_->mutex_);
692  
    state_->socket_list_.remove(select_impl);
691  
    state_->socket_list_.remove(select_impl);
693  
    state_->socket_ptrs_.erase(select_impl);
692  
    state_->socket_ptrs_.erase(select_impl);
694  
}
693  
}
695  

694  

696  
inline std::error_code
695  
inline std::error_code
697  
select_socket_service::open_socket(
696  
select_socket_service::open_socket(
698 -
    tcp_socket::implementation& impl,
697 +
    tcp_socket::implementation& impl, int family, int type, int protocol)
699 -
    int family, int type, int protocol)
 
700  
{
698  
{
701  
    auto* select_impl = static_cast<select_socket*>(&impl);
699  
    auto* select_impl = static_cast<select_socket*>(&impl);
702  
    select_impl->close_socket();
700  
    select_impl->close_socket();
703  

701  

704  
    int fd = ::socket(family, type, protocol);
702  
    int fd = ::socket(family, type, protocol);
705  
    if (fd < 0)
703  
    if (fd < 0)
706  
        return make_err(errno);
704  
        return make_err(errno);
707  

705  

708  
    if (family == AF_INET6)
706  
    if (family == AF_INET6)
709  
    {
707  
    {
710  
        int one = 1;
708  
        int one = 1;
711  
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
709  
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
712  
    }
710  
    }
713  

711  

714  
    // Set non-blocking and close-on-exec
712  
    // Set non-blocking and close-on-exec
715  
    int flags = ::fcntl(fd, F_GETFL, 0);
713  
    int flags = ::fcntl(fd, F_GETFL, 0);
716  
    if (flags == -1)
714  
    if (flags == -1)
717  
    {
715  
    {
718  
        int errn = errno;
716  
        int errn = errno;
719  
        ::close(fd);
717  
        ::close(fd);
720  
        return make_err(errn);
718  
        return make_err(errn);
721  
    }
719  
    }
722  
    if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
720  
    if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
723  
    {
721  
    {
724  
        int errn = errno;
722  
        int errn = errno;
725  
        ::close(fd);
723  
        ::close(fd);
726  
        return make_err(errn);
724  
        return make_err(errn);
727  
    }
725  
    }
728  
    if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
726  
    if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
729  
    {
727  
    {
730  
        int errn = errno;
728  
        int errn = errno;
731  
        ::close(fd);
729  
        ::close(fd);
732  
        return make_err(errn);
730  
        return make_err(errn);
733  
    }
731  
    }
734  

732  

735  
    // Check fd is within select() limits
733  
    // Check fd is within select() limits
736  
    if (fd >= FD_SETSIZE)
734  
    if (fd >= FD_SETSIZE)
737  
    {
735  
    {
738  
        ::close(fd);
736  
        ::close(fd);
739  
        return make_err(EMFILE); // Too many open files
737  
        return make_err(EMFILE); // Too many open files
740  
    }
738  
    }
741  

739  

742  
    select_impl->fd_ = fd;
740  
    select_impl->fd_ = fd;
743  
    return {};
741  
    return {};
744  
}
742  
}
745  

743  

746  
inline void
744  
inline void
747  
select_socket_service::close(io_object::handle& h)
745  
select_socket_service::close(io_object::handle& h)
748  
{
746  
{
749  
    static_cast<select_socket*>(h.get())->close_socket();
747  
    static_cast<select_socket*>(h.get())->close_socket();
750  
}
748  
}
751  

749  

752  
inline void
750  
inline void
753  
select_socket_service::post(select_op* op)
751  
select_socket_service::post(select_op* op)
754  
{
752  
{
755  
    state_->sched_.post(op);
753  
    state_->sched_.post(op);
756  
}
754  
}
757  

755  

758  
inline void
756  
inline void
759  
select_socket_service::work_started() noexcept
757  
select_socket_service::work_started() noexcept
760  
{
758  
{
761  
    state_->sched_.work_started();
759  
    state_->sched_.work_started();
762  
}
760  
}
763  

761  

764  
inline void
762  
inline void
765  
select_socket_service::work_finished() noexcept
763  
select_socket_service::work_finished() noexcept
766  
{
764  
{
767  
    state_->sched_.work_finished();
765  
    state_->sched_.work_finished();
768  
}
766  
}
769  

767  

770  
} // namespace boost::corosio::detail
768  
} // namespace boost::corosio::detail
771  

769  

772  
#endif // BOOST_COROSIO_HAS_SELECT
770  
#endif // BOOST_COROSIO_HAS_SELECT
773  

771  

774  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
772  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP