1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_EPOLL
15  
#if BOOST_COROSIO_HAS_EPOLL
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  
#include <boost/corosio/detail/socket_service.hpp>
19  
#include <boost/corosio/detail/socket_service.hpp>
20  

20  

21  
#include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
21  
#include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
22  
#include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
22  
#include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
23  

23  

24 -
#include <boost/corosio/detail/endpoint_convert.hpp>
24 +
#include <boost/corosio/native/detail/endpoint_convert.hpp>
25 -
#include <boost/corosio/detail/make_err.hpp>
25 +
#include <boost/corosio/native/detail/make_err.hpp>
26  
#include <boost/corosio/detail/dispatch_coro.hpp>
26  
#include <boost/corosio/detail/dispatch_coro.hpp>
27  
#include <boost/corosio/detail/except.hpp>
27  
#include <boost/corosio/detail/except.hpp>
28  
#include <boost/capy/buffers.hpp>
28  
#include <boost/capy/buffers.hpp>
29  

29  

30  
#include <coroutine>
30  
#include <coroutine>
31  
#include <mutex>
31  
#include <mutex>
32  
#include <unordered_map>
32  
#include <unordered_map>
33  
#include <utility>
33  
#include <utility>
34  

34  

35  
#include <errno.h>
35  
#include <errno.h>
36  
#include <netinet/in.h>
36  
#include <netinet/in.h>
37  
#include <netinet/tcp.h>
37  
#include <netinet/tcp.h>
38  
#include <sys/epoll.h>
38  
#include <sys/epoll.h>
39  
#include <sys/socket.h>
39  
#include <sys/socket.h>
40  
#include <unistd.h>
40  
#include <unistd.h>
41  

41  

42  
/*
42  
/*
43  
    epoll Socket Implementation
43  
    epoll Socket Implementation
44  
    ===========================
44  
    ===========================
45  

45  

46  
    Each I/O operation follows the same pattern:
46  
    Each I/O operation follows the same pattern:
47  
      1. Try the syscall immediately (non-blocking socket)
47  
      1. Try the syscall immediately (non-blocking socket)
48  
      2. If it succeeds or fails with a real error, post to completion queue
48  
      2. If it succeeds or fails with a real error, post to completion queue
49  
      3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
49  
      3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
50  

50  

51  
    This "try first" approach avoids unnecessary epoll round-trips for
51  
    This "try first" approach avoids unnecessary epoll round-trips for
52  
    operations that can complete immediately (common for small reads/writes
52  
    operations that can complete immediately (common for small reads/writes
53  
    on fast local connections).
53  
    on fast local connections).
54  

54  

55  
    One-Shot Registration
55  
    One-Shot Registration
56  
    ---------------------
56  
    ---------------------
57  
    We use one-shot epoll registration: each operation registers, waits for
57  
    We use one-shot epoll registration: each operation registers, waits for
58  
    one event, then unregisters. This simplifies the state machine since we
58  
    one event, then unregisters. This simplifies the state machine since we
59  
    don't need to track whether an fd is currently registered or handle
59  
    don't need to track whether an fd is currently registered or handle
60  
    re-arming. The tradeoff is slightly more epoll_ctl calls, but the
60  
    re-arming. The tradeoff is slightly more epoll_ctl calls, but the
61  
    simplicity is worth it.
61  
    simplicity is worth it.
62  

62  

63  
    Cancellation
63  
    Cancellation
64  
    ------------
64  
    ------------
65  
    See op.hpp for the completion/cancellation race handling via the
65  
    See op.hpp for the completion/cancellation race handling via the
66  
    `registered` atomic. cancel() must complete pending operations (post
66  
    `registered` atomic. cancel() must complete pending operations (post
67  
    them with cancelled flag) so coroutines waiting on them can resume.
67  
    them with cancelled flag) so coroutines waiting on them can resume.
68  
    close_socket() calls cancel() first to ensure this.
68  
    close_socket() calls cancel() first to ensure this.
69  

69  

70  
    Impl Lifetime with shared_ptr
70  
    Impl Lifetime with shared_ptr
71  
    -----------------------------
71  
    -----------------------------
72  
    Socket impls use enable_shared_from_this. The service owns impls via
72  
    Socket impls use enable_shared_from_this. The service owns impls via
73  
    shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
73  
    shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
74  
    removal. When a user calls close(), we call cancel() which posts pending
74  
    removal. When a user calls close(), we call cancel() which posts pending
75  
    ops to the scheduler.
75  
    ops to the scheduler.
76  

76  

77  
    CRITICAL: The posted ops must keep the impl alive until they complete.
77  
    CRITICAL: The posted ops must keep the impl alive until they complete.
78  
    Otherwise the scheduler would process a freed op (use-after-free). The
78  
    Otherwise the scheduler would process a freed op (use-after-free). The
79  
    cancel() method captures shared_from_this() into op.impl_ptr before
79  
    cancel() method captures shared_from_this() into op.impl_ptr before
80  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
80  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
81  
    to be destroyed if no other references exist.
81  
    to be destroyed if no other references exist.
82  

82  

83  
    Service Ownership
83  
    Service Ownership
84  
    -----------------
84  
    -----------------
85  
    epoll_socket_service owns all socket impls. destroy_impl() removes the
85  
    epoll_socket_service owns all socket impls. destroy_impl() removes the
86  
    shared_ptr from the map, but the impl may survive if ops still hold
86  
    shared_ptr from the map, but the impl may survive if ops still hold
87  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
87  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
88  
    in-flight ops will complete and release their refs.
88  
    in-flight ops will complete and release their refs.
89  
*/
89  
*/
90  

90  

91  
namespace boost::corosio::detail {
91  
namespace boost::corosio::detail {
92  

92  

93  
/** State for epoll socket service. */
93  
/** State for epoll socket service. */
94  
class epoll_socket_state
94  
class epoll_socket_state
95  
{
95  
{
96  
public:
96  
public:
97  
    explicit epoll_socket_state(epoll_scheduler& sched) noexcept : sched_(sched)
97  
    explicit epoll_socket_state(epoll_scheduler& sched) noexcept : sched_(sched)
98  
    {
98  
    {
99  
    }
99  
    }
100  

100  

101  
    epoll_scheduler& sched_;
101  
    epoll_scheduler& sched_;
102  
    std::mutex mutex_;
102  
    std::mutex mutex_;
103  
    intrusive_list<epoll_socket> socket_list_;
103  
    intrusive_list<epoll_socket> socket_list_;
104  
    std::unordered_map<epoll_socket*, std::shared_ptr<epoll_socket>>
104  
    std::unordered_map<epoll_socket*, std::shared_ptr<epoll_socket>>
105  
        socket_ptrs_;
105  
        socket_ptrs_;
106  
};
106  
};
107  

107  

108  
/** epoll socket service implementation.
108  
/** epoll socket service implementation.
109  

109  

110  
    Inherits from socket_service to enable runtime polymorphism.
110  
    Inherits from socket_service to enable runtime polymorphism.
111  
    Uses key_type = socket_service for service lookup.
111  
    Uses key_type = socket_service for service lookup.
112  
*/
112  
*/
113  
class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
113  
class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
114  
{
114  
{
115  
public:
115  
public:
116  
    explicit epoll_socket_service(capy::execution_context& ctx);
116  
    explicit epoll_socket_service(capy::execution_context& ctx);
117  
    ~epoll_socket_service() override;
117  
    ~epoll_socket_service() override;
118  

118  

119  
    epoll_socket_service(epoll_socket_service const&)            = delete;
119  
    epoll_socket_service(epoll_socket_service const&)            = delete;
120  
    epoll_socket_service& operator=(epoll_socket_service const&) = delete;
120  
    epoll_socket_service& operator=(epoll_socket_service const&) = delete;
121  

121  

122  
    void shutdown() override;
122  
    void shutdown() override;
123  

123  

124  
    io_object::implementation* construct() override;
124  
    io_object::implementation* construct() override;
125  
    void destroy(io_object::implementation*) override;
125  
    void destroy(io_object::implementation*) override;
126  
    void close(io_object::handle&) override;
126  
    void close(io_object::handle&) override;
127 -
    std::error_code
127 +
    std::error_code open_socket(
128 -
    open_socket(tcp_socket::implementation& impl,
128 +
        tcp_socket::implementation& impl,
129 -
                int family, int type, int protocol) override;
129 +
        int family,
 
130 +
        int type,
 
131 +
        int protocol) override;
130  

132  

131  
    epoll_scheduler& scheduler() const noexcept
133  
    epoll_scheduler& scheduler() const noexcept
132  
    {
134  
    {
133  
        return state_->sched_;
135  
        return state_->sched_;
134  
    }
136  
    }
135  
    void post(epoll_op* op);
137  
    void post(epoll_op* op);
136  
    void work_started() noexcept;
138  
    void work_started() noexcept;
137  
    void work_finished() noexcept;
139  
    void work_finished() noexcept;
138  

140  

139  
private:
141  
private:
140  
    std::unique_ptr<epoll_socket_state> state_;
142  
    std::unique_ptr<epoll_socket_state> state_;
141  
};
143  
};
142  

144  

143  
//--------------------------------------------------------------------------
145  
//--------------------------------------------------------------------------
144  
//
146  
//
145  
// Implementation
147  
// Implementation
146  
//
148  
//
147  
//--------------------------------------------------------------------------
149  
//--------------------------------------------------------------------------
148  

150  

149  
// Register an op with the reactor, handling cached edge events.
151  
// Register an op with the reactor, handling cached edge events.
150  
// Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
152  
// Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
151  
inline void
153  
inline void
152  
epoll_socket::register_op(
154  
epoll_socket::register_op(
153  
    epoll_op& op,
155  
    epoll_op& op,
154  
    epoll_op*& desc_slot,
156  
    epoll_op*& desc_slot,
155  
    bool& ready_flag,
157  
    bool& ready_flag,
156  
    bool& cancel_flag) noexcept
158  
    bool& cancel_flag) noexcept
157  
{
159  
{
158  
    svc_.work_started();
160  
    svc_.work_started();
159  

161  

160  
    std::lock_guard lock(desc_state_.mutex);
162  
    std::lock_guard lock(desc_state_.mutex);
161  
    bool io_done = false;
163  
    bool io_done = false;
162  
    if (ready_flag)
164  
    if (ready_flag)
163  
    {
165  
    {
164  
        ready_flag = false;
166  
        ready_flag = false;
165  
        op.perform_io();
167  
        op.perform_io();
166  
        io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
168  
        io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
167  
        if (!io_done)
169  
        if (!io_done)
168  
            op.errn = 0;
170  
            op.errn = 0;
169  
    }
171  
    }
170  

172  

171  
    if (cancel_flag)
173  
    if (cancel_flag)
172  
    {
174  
    {
173  
        cancel_flag = false;
175  
        cancel_flag = false;
174  
        op.cancelled.store(true, std::memory_order_relaxed);
176  
        op.cancelled.store(true, std::memory_order_relaxed);
175  
    }
177  
    }
176  

178  

177  
    if (io_done || op.cancelled.load(std::memory_order_acquire))
179  
    if (io_done || op.cancelled.load(std::memory_order_acquire))
178  
    {
180  
    {
179  
        svc_.post(&op);
181  
        svc_.post(&op);
180  
        svc_.work_finished();
182  
        svc_.work_finished();
181  
    }
183  
    }
182  
    else
184  
    else
183  
    {
185  
    {
184  
        desc_slot = &op;
186  
        desc_slot = &op;
185  
    }
187  
    }
186  
}
188  
}
187  

189  

188  
inline void
190  
inline void
189  
epoll_op::canceller::operator()() const noexcept
191  
epoll_op::canceller::operator()() const noexcept
190  
{
192  
{
191  
    op->cancel();
193  
    op->cancel();
192  
}
194  
}
193  

195  

194  
inline void
196  
inline void
195  
epoll_connect_op::cancel() noexcept
197  
epoll_connect_op::cancel() noexcept
196  
{
198  
{
197  
    if (socket_impl_)
199  
    if (socket_impl_)
198  
        socket_impl_->cancel_single_op(*this);
200  
        socket_impl_->cancel_single_op(*this);
199  
    else
201  
    else
200  
        request_cancel();
202  
        request_cancel();
201  
}
203  
}
202  

204  

203  
inline void
205  
inline void
204  
epoll_read_op::cancel() noexcept
206  
epoll_read_op::cancel() noexcept
205  
{
207  
{
206  
    if (socket_impl_)
208  
    if (socket_impl_)
207  
        socket_impl_->cancel_single_op(*this);
209  
        socket_impl_->cancel_single_op(*this);
208  
    else
210  
    else
209  
        request_cancel();
211  
        request_cancel();
210  
}
212  
}
211  

213  

212  
inline void
214  
inline void
213  
epoll_write_op::cancel() noexcept
215  
epoll_write_op::cancel() noexcept
214  
{
216  
{
215  
    if (socket_impl_)
217  
    if (socket_impl_)
216  
        socket_impl_->cancel_single_op(*this);
218  
        socket_impl_->cancel_single_op(*this);
217  
    else
219  
    else
218  
        request_cancel();
220  
        request_cancel();
219  
}
221  
}
220  

222  

221  
inline void
223  
inline void
222  
epoll_op::operator()()
224  
epoll_op::operator()()
223  
{
225  
{
224  
    stop_cb.reset();
226  
    stop_cb.reset();
225  

227  

226  
    socket_impl_->svc_.scheduler().reset_inline_budget();
228  
    socket_impl_->svc_.scheduler().reset_inline_budget();
227  

229  

228  
    if (cancelled.load(std::memory_order_acquire))
230  
    if (cancelled.load(std::memory_order_acquire))
229  
        *ec_out = capy::error::canceled;
231  
        *ec_out = capy::error::canceled;
230  
    else if (errn != 0)
232  
    else if (errn != 0)
231  
        *ec_out = make_err(errn);
233  
        *ec_out = make_err(errn);
232  
    else if (is_read_operation() && bytes_transferred == 0)
234  
    else if (is_read_operation() && bytes_transferred == 0)
233  
        *ec_out = capy::error::eof;
235  
        *ec_out = capy::error::eof;
234  
    else
236  
    else
235  
        *ec_out = {};
237  
        *ec_out = {};
236  

238  

237  
    *bytes_out = bytes_transferred;
239  
    *bytes_out = bytes_transferred;
238  

240  

239  
    // Move to stack before resuming coroutine. The coroutine might close
241  
    // Move to stack before resuming coroutine. The coroutine might close
240  
    // the socket, releasing the last wrapper ref. If impl_ptr were the
242  
    // the socket, releasing the last wrapper ref. If impl_ptr were the
241  
    // last ref and we destroyed it while still in operator(), we'd have
243  
    // last ref and we destroyed it while still in operator(), we'd have
242  
    // use-after-free. Moving to local ensures destruction happens at
244  
    // use-after-free. Moving to local ensures destruction happens at
243  
    // function exit, after all member accesses are complete.
245  
    // function exit, after all member accesses are complete.
244  
    capy::executor_ref saved_ex(ex);
246  
    capy::executor_ref saved_ex(ex);
245  
    std::coroutine_handle<> saved_h(h);
247  
    std::coroutine_handle<> saved_h(h);
246  
    auto prevent_premature_destruction = std::move(impl_ptr);
248  
    auto prevent_premature_destruction = std::move(impl_ptr);
247  
    dispatch_coro(saved_ex, saved_h).resume();
249  
    dispatch_coro(saved_ex, saved_h).resume();
248  
}
250  
}
249  

251  

250  
inline void
252  
inline void
251  
epoll_connect_op::operator()()
253  
epoll_connect_op::operator()()
252  
{
254  
{
253  
    stop_cb.reset();
255  
    stop_cb.reset();
254  

256  

255  
    socket_impl_->svc_.scheduler().reset_inline_budget();
257  
    socket_impl_->svc_.scheduler().reset_inline_budget();
256  

258  

257  
    bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
259  
    bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
258  

260  

259  
    // Cache endpoints on successful connect
261  
    // Cache endpoints on successful connect
260  
    if (success && socket_impl_)
262  
    if (success && socket_impl_)
261  
    {
263  
    {
262  
        endpoint local_ep;
264  
        endpoint local_ep;
263  
        sockaddr_storage local_storage{};
265  
        sockaddr_storage local_storage{};
264  
        socklen_t local_len = sizeof(local_storage);
266  
        socklen_t local_len = sizeof(local_storage);
265  
        if (::getsockname(
267  
        if (::getsockname(
266 -
                fd, reinterpret_cast<sockaddr*>(&local_storage),
268 +
                fd, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
267 -
                &local_len) == 0)
269 +
            0)
268  
            local_ep = from_sockaddr(local_storage);
270  
            local_ep = from_sockaddr(local_storage);
269  
        static_cast<epoll_socket*>(socket_impl_)
271  
        static_cast<epoll_socket*>(socket_impl_)
270  
            ->set_endpoints(local_ep, target_endpoint);
272  
            ->set_endpoints(local_ep, target_endpoint);
271  
    }
273  
    }
272  

274  

273  
    if (cancelled.load(std::memory_order_acquire))
275  
    if (cancelled.load(std::memory_order_acquire))
274  
        *ec_out = capy::error::canceled;
276  
        *ec_out = capy::error::canceled;
275  
    else if (errn != 0)
277  
    else if (errn != 0)
276  
        *ec_out = make_err(errn);
278  
        *ec_out = make_err(errn);
277  
    else
279  
    else
278  
        *ec_out = {};
280  
        *ec_out = {};
279  

281  

280  
    // Move to stack before resuming. See epoll_op::operator()() for rationale.
282  
    // Move to stack before resuming. See epoll_op::operator()() for rationale.
281  
    capy::executor_ref saved_ex(ex);
283  
    capy::executor_ref saved_ex(ex);
282  
    std::coroutine_handle<> saved_h(h);
284  
    std::coroutine_handle<> saved_h(h);
283  
    auto prevent_premature_destruction = std::move(impl_ptr);
285  
    auto prevent_premature_destruction = std::move(impl_ptr);
284  
    dispatch_coro(saved_ex, saved_h).resume();
286  
    dispatch_coro(saved_ex, saved_h).resume();
285  
}
287  
}
286  

288  

287  
inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
289  
inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
288  
    : svc_(svc)
290  
    : svc_(svc)
289  
{
291  
{
290  
}
292  
}
291  

293  

292  
inline epoll_socket::~epoll_socket() = default;
294  
inline epoll_socket::~epoll_socket() = default;
293  

295  

294  
inline std::coroutine_handle<>
296  
inline std::coroutine_handle<>
295  
epoll_socket::connect(
297  
epoll_socket::connect(
296  
    std::coroutine_handle<> h,
298  
    std::coroutine_handle<> h,
297  
    capy::executor_ref ex,
299  
    capy::executor_ref ex,
298  
    endpoint ep,
300  
    endpoint ep,
299  
    std::stop_token token,
301  
    std::stop_token token,
300  
    std::error_code* ec)
302  
    std::error_code* ec)
301  
{
303  
{
302  
    auto& op = conn_;
304  
    auto& op = conn_;
303  

305  

304  
    sockaddr_storage storage{};
306  
    sockaddr_storage storage{};
305  
    socklen_t addrlen =
307  
    socklen_t addrlen =
306  
        detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
308  
        detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
307 -
    int result =
309 +
    int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
308 -
        ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
 
309  

310  

310  
    if (result == 0)
311  
    if (result == 0)
311  
    {
312  
    {
312  
        sockaddr_storage local_storage{};
313  
        sockaddr_storage local_storage{};
313  
        socklen_t local_len = sizeof(local_storage);
314  
        socklen_t local_len = sizeof(local_storage);
314  
        if (::getsockname(
315  
        if (::getsockname(
315 -
                fd_, reinterpret_cast<sockaddr*>(&local_storage),
316 +
                fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
316 -
                &local_len) == 0)
317 +
            0)
317  
            local_endpoint_ = detail::from_sockaddr(local_storage);
318  
            local_endpoint_ = detail::from_sockaddr(local_storage);
318  
        remote_endpoint_ = ep;
319  
        remote_endpoint_ = ep;
319  
    }
320  
    }
320  

321  

321  
    if (result == 0 || errno != EINPROGRESS)
322  
    if (result == 0 || errno != EINPROGRESS)
322  
    {
323  
    {
323  
        int err = (result < 0) ? errno : 0;
324  
        int err = (result < 0) ? errno : 0;
324  
        if (svc_.scheduler().try_consume_inline_budget())
325  
        if (svc_.scheduler().try_consume_inline_budget())
325  
        {
326  
        {
326  
            *ec = err ? make_err(err) : std::error_code{};
327  
            *ec = err ? make_err(err) : std::error_code{};
327  
            return dispatch_coro(ex, h);
328  
            return dispatch_coro(ex, h);
328  
        }
329  
        }
329  
        op.reset();
330  
        op.reset();
330  
        op.h               = h;
331  
        op.h               = h;
331  
        op.ex              = ex;
332  
        op.ex              = ex;
332  
        op.ec_out          = ec;
333  
        op.ec_out          = ec;
333  
        op.fd              = fd_;
334  
        op.fd              = fd_;
334  
        op.target_endpoint = ep;
335  
        op.target_endpoint = ep;
335  
        op.start(token, this);
336  
        op.start(token, this);
336  
        op.impl_ptr = shared_from_this();
337  
        op.impl_ptr = shared_from_this();
337  
        op.complete(err, 0);
338  
        op.complete(err, 0);
338  
        svc_.post(&op);
339  
        svc_.post(&op);
339  
        return std::noop_coroutine();
340  
        return std::noop_coroutine();
340  
    }
341  
    }
341  

342  

342  
    // EINPROGRESS — register with reactor
343  
    // EINPROGRESS — register with reactor
343  
    op.reset();
344  
    op.reset();
344  
    op.h               = h;
345  
    op.h               = h;
345  
    op.ex              = ex;
346  
    op.ex              = ex;
346  
    op.ec_out          = ec;
347  
    op.ec_out          = ec;
347  
    op.fd              = fd_;
348  
    op.fd              = fd_;
348  
    op.target_endpoint = ep;
349  
    op.target_endpoint = ep;
349  
    op.start(token, this);
350  
    op.start(token, this);
350  
    op.impl_ptr = shared_from_this();
351  
    op.impl_ptr = shared_from_this();
351  

352  

352  
    register_op(
353  
    register_op(
353  
        op, desc_state_.connect_op, desc_state_.write_ready,
354  
        op, desc_state_.connect_op, desc_state_.write_ready,
354  
        desc_state_.connect_cancel_pending);
355  
        desc_state_.connect_cancel_pending);
355  
    return std::noop_coroutine();
356  
    return std::noop_coroutine();
356  
}
357  
}
357  

358  

358  
inline std::coroutine_handle<>
359  
inline std::coroutine_handle<>
359  
epoll_socket::read_some(
360  
epoll_socket::read_some(
360  
    std::coroutine_handle<> h,
361  
    std::coroutine_handle<> h,
361  
    capy::executor_ref ex,
362  
    capy::executor_ref ex,
362 -
    io_buffer_param param,
363 +
    buffer_param param,
363  
    std::stop_token token,
364  
    std::stop_token token,
364  
    std::error_code* ec,
365  
    std::error_code* ec,
365  
    std::size_t* bytes_out)
366  
    std::size_t* bytes_out)
366  
{
367  
{
367  
    auto& op = rd_;
368  
    auto& op = rd_;
368  
    op.reset();
369  
    op.reset();
369  

370  

370  
    capy::mutable_buffer bufs[epoll_read_op::max_buffers];
371  
    capy::mutable_buffer bufs[epoll_read_op::max_buffers];
371  
    op.iovec_count =
372  
    op.iovec_count =
372  
        static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
373  
        static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
373  

374  

374  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
375  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
375  
    {
376  
    {
376  
        op.empty_buffer_read = true;
377  
        op.empty_buffer_read = true;
377  
        op.h                 = h;
378  
        op.h                 = h;
378  
        op.ex                = ex;
379  
        op.ex                = ex;
379  
        op.ec_out            = ec;
380  
        op.ec_out            = ec;
380  
        op.bytes_out         = bytes_out;
381  
        op.bytes_out         = bytes_out;
381  
        op.start(token, this);
382  
        op.start(token, this);
382  
        op.impl_ptr = shared_from_this();
383  
        op.impl_ptr = shared_from_this();
383  
        op.complete(0, 0);
384  
        op.complete(0, 0);
384  
        svc_.post(&op);
385  
        svc_.post(&op);
385  
        return std::noop_coroutine();
386  
        return std::noop_coroutine();
386  
    }
387  
    }
387  

388  

388  
    for (int i = 0; i < op.iovec_count; ++i)
389  
    for (int i = 0; i < op.iovec_count; ++i)
389  
    {
390  
    {
390  
        op.iovecs[i].iov_base = bufs[i].data();
391  
        op.iovecs[i].iov_base = bufs[i].data();
391  
        op.iovecs[i].iov_len  = bufs[i].size();
392  
        op.iovecs[i].iov_len  = bufs[i].size();
392  
    }
393  
    }
393  

394  

394  
    // Speculative read
395  
    // Speculative read
395  
    ssize_t n;
396  
    ssize_t n;
396  
    do
397  
    do
397  
    {
398  
    {
398  
        n = ::readv(fd_, op.iovecs, op.iovec_count);
399  
        n = ::readv(fd_, op.iovecs, op.iovec_count);
399  
    }
400  
    }
400  
    while (n < 0 && errno == EINTR);
401  
    while (n < 0 && errno == EINTR);
401  

402  

402  
    if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
403  
    if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
403  
    {
404  
    {
404  
        int err    = (n < 0) ? errno : 0;
405  
        int err    = (n < 0) ? errno : 0;
405  
        auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
406  
        auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
406  

407  

407  
        if (svc_.scheduler().try_consume_inline_budget())
408  
        if (svc_.scheduler().try_consume_inline_budget())
408  
        {
409  
        {
409  
            if (err)
410  
            if (err)
410  
                *ec = make_err(err);
411  
                *ec = make_err(err);
411  
            else if (n == 0)
412  
            else if (n == 0)
412  
                *ec = capy::error::eof;
413  
                *ec = capy::error::eof;
413  
            else
414  
            else
414  
                *ec = {};
415  
                *ec = {};
415  
            *bytes_out = bytes;
416  
            *bytes_out = bytes;
416  
            return dispatch_coro(ex, h);
417  
            return dispatch_coro(ex, h);
417  
        }
418  
        }
418  
        op.h         = h;
419  
        op.h         = h;
419  
        op.ex        = ex;
420  
        op.ex        = ex;
420  
        op.ec_out    = ec;
421  
        op.ec_out    = ec;
421  
        op.bytes_out = bytes_out;
422  
        op.bytes_out = bytes_out;
422  
        op.start(token, this);
423  
        op.start(token, this);
423  
        op.impl_ptr = shared_from_this();
424  
        op.impl_ptr = shared_from_this();
424  
        op.complete(err, bytes);
425  
        op.complete(err, bytes);
425  
        svc_.post(&op);
426  
        svc_.post(&op);
426  
        return std::noop_coroutine();
427  
        return std::noop_coroutine();
427  
    }
428  
    }
428  

429  

429  
    // EAGAIN — register with reactor
430  
    // EAGAIN — register with reactor
430  
    op.h         = h;
431  
    op.h         = h;
431  
    op.ex        = ex;
432  
    op.ex        = ex;
432  
    op.ec_out    = ec;
433  
    op.ec_out    = ec;
433  
    op.bytes_out = bytes_out;
434  
    op.bytes_out = bytes_out;
434  
    op.fd        = fd_;
435  
    op.fd        = fd_;
435  
    op.start(token, this);
436  
    op.start(token, this);
436  
    op.impl_ptr = shared_from_this();
437  
    op.impl_ptr = shared_from_this();
437  

438  

438  
    register_op(
439  
    register_op(
439  
        op, desc_state_.read_op, desc_state_.read_ready,
440  
        op, desc_state_.read_op, desc_state_.read_ready,
440  
        desc_state_.read_cancel_pending);
441  
        desc_state_.read_cancel_pending);
441  
    return std::noop_coroutine();
442  
    return std::noop_coroutine();
442  
}
443  
}
443  

444  

444  
inline std::coroutine_handle<>
445  
inline std::coroutine_handle<>
445  
epoll_socket::write_some(
446  
epoll_socket::write_some(
446  
    std::coroutine_handle<> h,
447  
    std::coroutine_handle<> h,
447  
    capy::executor_ref ex,
448  
    capy::executor_ref ex,
448 -
    io_buffer_param param,
449 +
    buffer_param param,
449  
    std::stop_token token,
450  
    std::stop_token token,
450  
    std::error_code* ec,
451  
    std::error_code* ec,
451  
    std::size_t* bytes_out)
452  
    std::size_t* bytes_out)
452  
{
453  
{
453  
    auto& op = wr_;
454  
    auto& op = wr_;
454  
    op.reset();
455  
    op.reset();
455  

456  

456  
    capy::mutable_buffer bufs[epoll_write_op::max_buffers];
457  
    capy::mutable_buffer bufs[epoll_write_op::max_buffers];
457  
    op.iovec_count =
458  
    op.iovec_count =
458  
        static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
459  
        static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
459  

460  

460  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
461  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
461  
    {
462  
    {
462  
        op.h         = h;
463  
        op.h         = h;
463  
        op.ex        = ex;
464  
        op.ex        = ex;
464  
        op.ec_out    = ec;
465  
        op.ec_out    = ec;
465  
        op.bytes_out = bytes_out;
466  
        op.bytes_out = bytes_out;
466  
        op.start(token, this);
467  
        op.start(token, this);
467  
        op.impl_ptr = shared_from_this();
468  
        op.impl_ptr = shared_from_this();
468  
        op.complete(0, 0);
469  
        op.complete(0, 0);
469  
        svc_.post(&op);
470  
        svc_.post(&op);
470  
        return std::noop_coroutine();
471  
        return std::noop_coroutine();
471  
    }
472  
    }
472  

473  

473  
    for (int i = 0; i < op.iovec_count; ++i)
474  
    for (int i = 0; i < op.iovec_count; ++i)
474  
    {
475  
    {
475  
        op.iovecs[i].iov_base = bufs[i].data();
476  
        op.iovecs[i].iov_base = bufs[i].data();
476  
        op.iovecs[i].iov_len  = bufs[i].size();
477  
        op.iovecs[i].iov_len  = bufs[i].size();
477  
    }
478  
    }
478  

479  

479  
    // Speculative write
480  
    // Speculative write
480  
    msghdr msg{};
481  
    msghdr msg{};
481  
    msg.msg_iov    = op.iovecs;
482  
    msg.msg_iov    = op.iovecs;
482  
    msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
483  
    msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
483  

484  

484  
    ssize_t n;
485  
    ssize_t n;
485  
    do
486  
    do
486  
    {
487  
    {
487  
        n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
488  
        n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
488  
    }
489  
    }
489  
    while (n < 0 && errno == EINTR);
490  
    while (n < 0 && errno == EINTR);
490  

491  

491  
    if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
492  
    if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
492  
    {
493  
    {
493  
        int err    = (n < 0) ? errno : 0;
494  
        int err    = (n < 0) ? errno : 0;
494  
        auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
495  
        auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
495  

496  

496  
        if (svc_.scheduler().try_consume_inline_budget())
497  
        if (svc_.scheduler().try_consume_inline_budget())
497  
        {
498  
        {
498  
            *ec        = err ? make_err(err) : std::error_code{};
499  
            *ec        = err ? make_err(err) : std::error_code{};
499  
            *bytes_out = bytes;
500  
            *bytes_out = bytes;
500  
            return dispatch_coro(ex, h);
501  
            return dispatch_coro(ex, h);
501  
        }
502  
        }
502  
        op.h         = h;
503  
        op.h         = h;
503  
        op.ex        = ex;
504  
        op.ex        = ex;
504  
        op.ec_out    = ec;
505  
        op.ec_out    = ec;
505  
        op.bytes_out = bytes_out;
506  
        op.bytes_out = bytes_out;
506  
        op.start(token, this);
507  
        op.start(token, this);
507  
        op.impl_ptr = shared_from_this();
508  
        op.impl_ptr = shared_from_this();
508  
        op.complete(err, bytes);
509  
        op.complete(err, bytes);
509  
        svc_.post(&op);
510  
        svc_.post(&op);
510  
        return std::noop_coroutine();
511  
        return std::noop_coroutine();
511  
    }
512  
    }
512  

513  

513  
    // EAGAIN — register with reactor
514  
    // EAGAIN — register with reactor
514  
    op.h         = h;
515  
    op.h         = h;
515  
    op.ex        = ex;
516  
    op.ex        = ex;
516  
    op.ec_out    = ec;
517  
    op.ec_out    = ec;
517  
    op.bytes_out = bytes_out;
518  
    op.bytes_out = bytes_out;
518  
    op.fd        = fd_;
519  
    op.fd        = fd_;
519  
    op.start(token, this);
520  
    op.start(token, this);
520  
    op.impl_ptr = shared_from_this();
521  
    op.impl_ptr = shared_from_this();
521  

522  

522  
    register_op(
523  
    register_op(
523  
        op, desc_state_.write_op, desc_state_.write_ready,
524  
        op, desc_state_.write_op, desc_state_.write_ready,
524  
        desc_state_.write_cancel_pending);
525  
        desc_state_.write_cancel_pending);
525  
    return std::noop_coroutine();
526  
    return std::noop_coroutine();
526  
}
527  
}
527  

528  

528  
inline std::error_code
529  
inline std::error_code
529  
epoll_socket::shutdown(tcp_socket::shutdown_type what) noexcept
530  
epoll_socket::shutdown(tcp_socket::shutdown_type what) noexcept
530  
{
531  
{
531  
    int how;
532  
    int how;
532  
    switch (what)
533  
    switch (what)
533  
    {
534  
    {
534  
    case tcp_socket::shutdown_receive:
535  
    case tcp_socket::shutdown_receive:
535  
        how = SHUT_RD;
536  
        how = SHUT_RD;
536  
        break;
537  
        break;
537  
    case tcp_socket::shutdown_send:
538  
    case tcp_socket::shutdown_send:
538  
        how = SHUT_WR;
539  
        how = SHUT_WR;
539  
        break;
540  
        break;
540  
    case tcp_socket::shutdown_both:
541  
    case tcp_socket::shutdown_both:
541  
        how = SHUT_RDWR;
542  
        how = SHUT_RDWR;
542  
        break;
543  
        break;
543  
    default:
544  
    default:
544  
        return make_err(EINVAL);
545  
        return make_err(EINVAL);
545  
    }
546  
    }
546  
    if (::shutdown(fd_, how) != 0)
547  
    if (::shutdown(fd_, how) != 0)
547  
        return make_err(errno);
548  
        return make_err(errno);
548  
    return {};
549  
    return {};
549  
}
550  
}
550  

551  

551  
inline std::error_code
552  
inline std::error_code
552  
epoll_socket::set_option(
553  
epoll_socket::set_option(
553 -
    int level, int optname,
554 +
    int level, int optname, void const* data, std::size_t size) noexcept
554 -
    void const* data, std::size_t size) noexcept
 
555  
{
555  
{
556 -
    if (::setsockopt(fd_, level, optname, data,
556 +
    if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) !=
557 -
            static_cast<socklen_t>(size)) != 0)
557 +
        0)
558  
        return make_err(errno);
558  
        return make_err(errno);
559  
    return {};
559  
    return {};
560  
}
560  
}
561  

561  

562  
inline std::error_code
562  
inline std::error_code
563  
epoll_socket::get_option(
563  
epoll_socket::get_option(
564 -
    int level, int optname,
564 +
    int level, int optname, void* data, std::size_t* size) const noexcept
565 -
    void* data, std::size_t* size) const noexcept
 
566  
{
565  
{
567  
    socklen_t len = static_cast<socklen_t>(*size);
566  
    socklen_t len = static_cast<socklen_t>(*size);
568  
    if (::getsockopt(fd_, level, optname, data, &len) != 0)
567  
    if (::getsockopt(fd_, level, optname, data, &len) != 0)
569  
        return make_err(errno);
568  
        return make_err(errno);
570  
    *size = static_cast<std::size_t>(len);
569  
    *size = static_cast<std::size_t>(len);
571  
    return {};
570  
    return {};
572  
}
571  
}
573  

572  

574  
inline void
573  
inline void
575  
epoll_socket::cancel() noexcept
574  
epoll_socket::cancel() noexcept
576  
{
575  
{
577  
    auto self = weak_from_this().lock();
576  
    auto self = weak_from_this().lock();
578  
    if (!self)
577  
    if (!self)
579  
        return;
578  
        return;
580  

579  

581  
    conn_.request_cancel();
580  
    conn_.request_cancel();
582  
    rd_.request_cancel();
581  
    rd_.request_cancel();
583  
    wr_.request_cancel();
582  
    wr_.request_cancel();
584  

583  

585  
    epoll_op* conn_claimed = nullptr;
584  
    epoll_op* conn_claimed = nullptr;
586  
    epoll_op* rd_claimed   = nullptr;
585  
    epoll_op* rd_claimed   = nullptr;
587  
    epoll_op* wr_claimed   = nullptr;
586  
    epoll_op* wr_claimed   = nullptr;
588  
    {
587  
    {
589  
        std::lock_guard lock(desc_state_.mutex);
588  
        std::lock_guard lock(desc_state_.mutex);
590  
        if (desc_state_.connect_op == &conn_)
589  
        if (desc_state_.connect_op == &conn_)
591  
            conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
590  
            conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
592  
        else
591  
        else
593  
            desc_state_.connect_cancel_pending = true;
592  
            desc_state_.connect_cancel_pending = true;
594  
        if (desc_state_.read_op == &rd_)
593  
        if (desc_state_.read_op == &rd_)
595  
            rd_claimed = std::exchange(desc_state_.read_op, nullptr);
594  
            rd_claimed = std::exchange(desc_state_.read_op, nullptr);
596  
        else
595  
        else
597  
            desc_state_.read_cancel_pending = true;
596  
            desc_state_.read_cancel_pending = true;
598  
        if (desc_state_.write_op == &wr_)
597  
        if (desc_state_.write_op == &wr_)
599  
            wr_claimed = std::exchange(desc_state_.write_op, nullptr);
598  
            wr_claimed = std::exchange(desc_state_.write_op, nullptr);
600  
        else
599  
        else
601  
            desc_state_.write_cancel_pending = true;
600  
            desc_state_.write_cancel_pending = true;
602  
    }
601  
    }
603  

602  

604  
    if (conn_claimed)
603  
    if (conn_claimed)
605  
    {
604  
    {
606  
        conn_.impl_ptr = self;
605  
        conn_.impl_ptr = self;
607  
        svc_.post(&conn_);
606  
        svc_.post(&conn_);
608  
        svc_.work_finished();
607  
        svc_.work_finished();
609  
    }
608  
    }
610  
    if (rd_claimed)
609  
    if (rd_claimed)
611  
    {
610  
    {
612  
        rd_.impl_ptr = self;
611  
        rd_.impl_ptr = self;
613  
        svc_.post(&rd_);
612  
        svc_.post(&rd_);
614  
        svc_.work_finished();
613  
        svc_.work_finished();
615  
    }
614  
    }
616  
    if (wr_claimed)
615  
    if (wr_claimed)
617  
    {
616  
    {
618  
        wr_.impl_ptr = self;
617  
        wr_.impl_ptr = self;
619  
        svc_.post(&wr_);
618  
        svc_.post(&wr_);
620  
        svc_.work_finished();
619  
        svc_.work_finished();
621  
    }
620  
    }
622  
}
621  
}
623  

622  

624  
inline void
623  
inline void
625  
epoll_socket::cancel_single_op(epoll_op& op) noexcept
624  
epoll_socket::cancel_single_op(epoll_op& op) noexcept
626  
{
625  
{
627  
    auto self = weak_from_this().lock();
626  
    auto self = weak_from_this().lock();
628  
    if (!self)
627  
    if (!self)
629  
        return;
628  
        return;
630  

629  

631  
    op.request_cancel();
630  
    op.request_cancel();
632  

631  

633  
    epoll_op** desc_op_ptr = nullptr;
632  
    epoll_op** desc_op_ptr = nullptr;
634  
    if (&op == &conn_)
633  
    if (&op == &conn_)
635  
        desc_op_ptr = &desc_state_.connect_op;
634  
        desc_op_ptr = &desc_state_.connect_op;
636  
    else if (&op == &rd_)
635  
    else if (&op == &rd_)
637  
        desc_op_ptr = &desc_state_.read_op;
636  
        desc_op_ptr = &desc_state_.read_op;
638  
    else if (&op == &wr_)
637  
    else if (&op == &wr_)
639  
        desc_op_ptr = &desc_state_.write_op;
638  
        desc_op_ptr = &desc_state_.write_op;
640  

639  

641  
    if (desc_op_ptr)
640  
    if (desc_op_ptr)
642  
    {
641  
    {
643  
        epoll_op* claimed = nullptr;
642  
        epoll_op* claimed = nullptr;
644  
        {
643  
        {
645  
            std::lock_guard lock(desc_state_.mutex);
644  
            std::lock_guard lock(desc_state_.mutex);
646  
            if (*desc_op_ptr == &op)
645  
            if (*desc_op_ptr == &op)
647  
                claimed = std::exchange(*desc_op_ptr, nullptr);
646  
                claimed = std::exchange(*desc_op_ptr, nullptr);
648  
            else if (&op == &conn_)
647  
            else if (&op == &conn_)
649  
                desc_state_.connect_cancel_pending = true;
648  
                desc_state_.connect_cancel_pending = true;
650  
            else if (&op == &rd_)
649  
            else if (&op == &rd_)
651  
                desc_state_.read_cancel_pending = true;
650  
                desc_state_.read_cancel_pending = true;
652  
            else if (&op == &wr_)
651  
            else if (&op == &wr_)
653  
                desc_state_.write_cancel_pending = true;
652  
                desc_state_.write_cancel_pending = true;
654  
        }
653  
        }
655  
        if (claimed)
654  
        if (claimed)
656  
        {
655  
        {
657  
            op.impl_ptr = self;
656  
            op.impl_ptr = self;
658  
            svc_.post(&op);
657  
            svc_.post(&op);
659  
            svc_.work_finished();
658  
            svc_.work_finished();
660  
        }
659  
        }
661  
    }
660  
    }
662  
}
661  
}
663  

662  

664  
inline void
663  
inline void
665  
epoll_socket::close_socket() noexcept
664  
epoll_socket::close_socket() noexcept
666  
{
665  
{
667  
    auto self = weak_from_this().lock();
666  
    auto self = weak_from_this().lock();
668  
    if (self)
667  
    if (self)
669  
    {
668  
    {
670  
        conn_.request_cancel();
669  
        conn_.request_cancel();
671  
        rd_.request_cancel();
670  
        rd_.request_cancel();
672  
        wr_.request_cancel();
671  
        wr_.request_cancel();
673  

672  

674  
        epoll_op* conn_claimed = nullptr;
673  
        epoll_op* conn_claimed = nullptr;
675  
        epoll_op* rd_claimed   = nullptr;
674  
        epoll_op* rd_claimed   = nullptr;
676  
        epoll_op* wr_claimed   = nullptr;
675  
        epoll_op* wr_claimed   = nullptr;
677  
        {
676  
        {
678  
            std::lock_guard lock(desc_state_.mutex);
677  
            std::lock_guard lock(desc_state_.mutex);
679  
            conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
678  
            conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
680  
            rd_claimed   = std::exchange(desc_state_.read_op, nullptr);
679  
            rd_claimed   = std::exchange(desc_state_.read_op, nullptr);
681  
            wr_claimed   = std::exchange(desc_state_.write_op, nullptr);
680  
            wr_claimed   = std::exchange(desc_state_.write_op, nullptr);
682  
            desc_state_.read_ready             = false;
681  
            desc_state_.read_ready             = false;
683  
            desc_state_.write_ready            = false;
682  
            desc_state_.write_ready            = false;
684  
            desc_state_.read_cancel_pending    = false;
683  
            desc_state_.read_cancel_pending    = false;
685  
            desc_state_.write_cancel_pending   = false;
684  
            desc_state_.write_cancel_pending   = false;
686  
            desc_state_.connect_cancel_pending = false;
685  
            desc_state_.connect_cancel_pending = false;
687  
        }
686  
        }
688  

687  

689  
        if (conn_claimed)
688  
        if (conn_claimed)
690  
        {
689  
        {
691  
            conn_.impl_ptr = self;
690  
            conn_.impl_ptr = self;
692  
            svc_.post(&conn_);
691  
            svc_.post(&conn_);
693  
            svc_.work_finished();
692  
            svc_.work_finished();
694  
        }
693  
        }
695  
        if (rd_claimed)
694  
        if (rd_claimed)
696  
        {
695  
        {
697  
            rd_.impl_ptr = self;
696  
            rd_.impl_ptr = self;
698  
            svc_.post(&rd_);
697  
            svc_.post(&rd_);
699  
            svc_.work_finished();
698  
            svc_.work_finished();
700  
        }
699  
        }
701  
        if (wr_claimed)
700  
        if (wr_claimed)
702  
        {
701  
        {
703  
            wr_.impl_ptr = self;
702  
            wr_.impl_ptr = self;
704  
            svc_.post(&wr_);
703  
            svc_.post(&wr_);
705  
            svc_.work_finished();
704  
            svc_.work_finished();
706  
        }
705  
        }
707  

706  

708  
        if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
707  
        if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
709  
            desc_state_.impl_ref_ = self;
708  
            desc_state_.impl_ref_ = self;
710  
    }
709  
    }
711  

710  

712  
    if (fd_ >= 0)
711  
    if (fd_ >= 0)
713  
    {
712  
    {
714  
        if (desc_state_.registered_events != 0)
713  
        if (desc_state_.registered_events != 0)
715  
            svc_.scheduler().deregister_descriptor(fd_);
714  
            svc_.scheduler().deregister_descriptor(fd_);
716  
        ::close(fd_);
715  
        ::close(fd_);
717  
        fd_ = -1;
716  
        fd_ = -1;
718  
    }
717  
    }
719  

718  

720  
    desc_state_.fd                = -1;
719  
    desc_state_.fd                = -1;
721  
    desc_state_.registered_events = 0;
720  
    desc_state_.registered_events = 0;
722  

721  

723  
    local_endpoint_  = endpoint{};
722  
    local_endpoint_  = endpoint{};
724  
    remote_endpoint_ = endpoint{};
723  
    remote_endpoint_ = endpoint{};
725  
}
724  
}
726  

725  

727  
inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
726  
inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
728  
    : state_(
727  
    : state_(
729  
          std::make_unique<epoll_socket_state>(
728  
          std::make_unique<epoll_socket_state>(
730  
              ctx.use_service<epoll_scheduler>()))
729  
              ctx.use_service<epoll_scheduler>()))
731  
{
730  
{
732  
}
731  
}
733  

732  

734  
inline epoll_socket_service::~epoll_socket_service() {}
733  
inline epoll_socket_service::~epoll_socket_service() {}
735  

734  

736  
inline void
735  
inline void
737  
epoll_socket_service::shutdown()
736  
epoll_socket_service::shutdown()
738  
{
737  
{
739  
    std::lock_guard lock(state_->mutex_);
738  
    std::lock_guard lock(state_->mutex_);
740  

739  

741  
    while (auto* impl = state_->socket_list_.pop_front())
740  
    while (auto* impl = state_->socket_list_.pop_front())
742  
        impl->close_socket();
741  
        impl->close_socket();
743  

742  

744  
    // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
743  
    // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
745  
    // drains completed_ops_, calling destroy() on each queued op. If we
744  
    // drains completed_ops_, calling destroy() on each queued op. If we
746  
    // released our shared_ptrs now, an epoll_op::destroy() could free the
745  
    // released our shared_ptrs now, an epoll_op::destroy() could free the
747  
    // last ref to an impl whose embedded descriptor_state is still linked
746  
    // last ref to an impl whose embedded descriptor_state is still linked
748  
    // in the queue — use-after-free on the next pop(). Letting ~state_
747  
    // in the queue — use-after-free on the next pop(). Letting ~state_
749  
    // release the ptrs (during service destruction, after scheduler
748  
    // release the ptrs (during service destruction, after scheduler
750  
    // shutdown) keeps every impl alive until all ops have been drained.
749  
    // shutdown) keeps every impl alive until all ops have been drained.
751  
}
750  
}
752  

751  

753  
inline io_object::implementation*
752  
inline io_object::implementation*
754  
epoll_socket_service::construct()
753  
epoll_socket_service::construct()
755  
{
754  
{
756  
    auto impl = std::make_shared<epoll_socket>(*this);
755  
    auto impl = std::make_shared<epoll_socket>(*this);
757  
    auto* raw = impl.get();
756  
    auto* raw = impl.get();
758  

757  

759  
    {
758  
    {
760  
        std::lock_guard lock(state_->mutex_);
759  
        std::lock_guard lock(state_->mutex_);
761  
        state_->socket_list_.push_back(raw);
760  
        state_->socket_list_.push_back(raw);
762  
        state_->socket_ptrs_.emplace(raw, std::move(impl));
761  
        state_->socket_ptrs_.emplace(raw, std::move(impl));
763  
    }
762  
    }
764  

763  

765  
    return raw;
764  
    return raw;
766  
}
765  
}
767  

766  

768  
inline void
767  
inline void
769  
epoll_socket_service::destroy(io_object::implementation* impl)
768  
epoll_socket_service::destroy(io_object::implementation* impl)
770  
{
769  
{
771  
    auto* epoll_impl = static_cast<epoll_socket*>(impl);
770  
    auto* epoll_impl = static_cast<epoll_socket*>(impl);
772  
    epoll_impl->close_socket();
771  
    epoll_impl->close_socket();
773  
    std::lock_guard lock(state_->mutex_);
772  
    std::lock_guard lock(state_->mutex_);
774  
    state_->socket_list_.remove(epoll_impl);
773  
    state_->socket_list_.remove(epoll_impl);
775  
    state_->socket_ptrs_.erase(epoll_impl);
774  
    state_->socket_ptrs_.erase(epoll_impl);
776  
}
775  
}
777  

776  

778  
inline std::error_code
777  
inline std::error_code
779  
epoll_socket_service::open_socket(
778  
epoll_socket_service::open_socket(
780 -
    tcp_socket::implementation& impl,
779 +
    tcp_socket::implementation& impl, int family, int type, int protocol)
781 -
    int family, int type, int protocol)
 
782  
{
780  
{
783  
    auto* epoll_impl = static_cast<epoll_socket*>(&impl);
781  
    auto* epoll_impl = static_cast<epoll_socket*>(&impl);
784  
    epoll_impl->close_socket();
782  
    epoll_impl->close_socket();
785  

783  

786  
    int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
784  
    int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
787  
    if (fd < 0)
785  
    if (fd < 0)
788  
        return make_err(errno);
786  
        return make_err(errno);
789  

787  

790  
    if (family == AF_INET6)
788  
    if (family == AF_INET6)
791  
    {
789  
    {
792  
        int one = 1;
790  
        int one = 1;
793  
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
791  
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
794  
    }
792  
    }
795  

793  

796  
    epoll_impl->fd_ = fd;
794  
    epoll_impl->fd_ = fd;
797  

795  

798  
    // Register fd with epoll (edge-triggered mode)
796  
    // Register fd with epoll (edge-triggered mode)
799  
    epoll_impl->desc_state_.fd = fd;
797  
    epoll_impl->desc_state_.fd = fd;
800  
    {
798  
    {
801  
        std::lock_guard lock(epoll_impl->desc_state_.mutex);
799  
        std::lock_guard lock(epoll_impl->desc_state_.mutex);
802  
        epoll_impl->desc_state_.read_op    = nullptr;
800  
        epoll_impl->desc_state_.read_op    = nullptr;
803  
        epoll_impl->desc_state_.write_op   = nullptr;
801  
        epoll_impl->desc_state_.write_op   = nullptr;
804  
        epoll_impl->desc_state_.connect_op = nullptr;
802  
        epoll_impl->desc_state_.connect_op = nullptr;
805  
    }
803  
    }
806  
    scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
804  
    scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
807  

805  

808  
    return {};
806  
    return {};
809  
}
807  
}
810  

808  

811  
inline void
809  
inline void
812  
epoll_socket_service::close(io_object::handle& h)
810  
epoll_socket_service::close(io_object::handle& h)
813  
{
811  
{
814  
    static_cast<epoll_socket*>(h.get())->close_socket();
812  
    static_cast<epoll_socket*>(h.get())->close_socket();
815  
}
813  
}
816  

814  

817  
inline void
815  
inline void
818  
epoll_socket_service::post(epoll_op* op)
816  
epoll_socket_service::post(epoll_op* op)
819  
{
817  
{
820  
    state_->sched_.post(op);
818  
    state_->sched_.post(op);
821  
}
819  
}
822  

820  

823  
inline void
821  
inline void
824  
epoll_socket_service::work_started() noexcept
822  
epoll_socket_service::work_started() noexcept
825  
{
823  
{
826  
    state_->sched_.work_started();
824  
    state_->sched_.work_started();
827  
}
825  
}
828  

826  

829  
inline void
827  
inline void
830  
epoll_socket_service::work_finished() noexcept
828  
epoll_socket_service::work_finished() noexcept
831  
{
829  
{
832  
    state_->sched_.work_finished();
830  
    state_->sched_.work_finished();
833  
}
831  
}
834  

832  

835  
} // namespace boost::corosio::detail
833  
} // namespace boost::corosio::detail
836  

834  

837  
#endif // BOOST_COROSIO_HAS_EPOLL
835  
#endif // BOOST_COROSIO_HAS_EPOLL
838  

836  

839  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
837  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP