TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/corosio/io/io_object.hpp>
19 : #include <boost/corosio/endpoint.hpp>
20 : #include <boost/capy/ex/executor_ref.hpp>
21 : #include <coroutine>
22 : #include <boost/capy/error.hpp>
23 : #include <system_error>
24 :
25 : #include <boost/corosio/native/detail/make_err.hpp>
26 : #include <boost/corosio/detail/dispatch_coro.hpp>
27 : #include <boost/corosio/detail/scheduler_op.hpp>
28 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
29 :
30 : #include <unistd.h>
31 : #include <errno.h>
32 :
33 : #include <atomic>
34 : #include <cstddef>
35 : #include <memory>
36 : #include <mutex>
37 : #include <optional>
38 : #include <stop_token>
39 :
40 : #include <netinet/in.h>
41 : #include <sys/socket.h>
42 : #include <sys/uio.h>
43 :
44 : /*
45 : epoll Operation State
46 : =====================
47 :
48 : Each async I/O operation has a corresponding epoll_op-derived struct that
49 : holds the operation's state while it's in flight. The socket impl owns
50 : fixed slots for each operation type (conn_, rd_, wr_), so only one
51 : operation of each type can be pending per socket at a time.
52 :
53 : Persistent Registration
54 : -----------------------
55 : File descriptors are registered with epoll once (via descriptor_state) and
56 : stay registered until closed. The descriptor_state tracks which operations
57 : are pending (read_op, write_op, connect_op). When an event arrives, the
58 : reactor dispatches to the appropriate pending operation.
59 :
60 : Impl Lifetime Management
61 : ------------------------
62 : When cancel() posts an op to the scheduler's ready queue, the socket impl
63 : might be destroyed before the scheduler processes the op. The `impl_ptr`
64 : member holds a shared_ptr to the impl, keeping it alive until the op
65 : completes. This is set by cancel() and cleared in operator() after the
66 : coroutine is resumed.
67 :
68 : EOF Detection
69 : -------------
70 : For reads, 0 bytes with no error means EOF. But an empty user buffer also
71 : returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
72 :
73 : SIGPIPE Prevention
74 : ------------------
75 : Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
76 : SIGPIPE when the peer has closed.
77 : */
78 :
79 : namespace boost::corosio::detail {
80 :
81 : // Forward declarations
82 : class epoll_socket;
83 : class epoll_acceptor;
84 : struct epoll_op;
85 :
86 : // Forward declaration
87 : class epoll_scheduler;
88 :
89 : /** Per-descriptor state for persistent epoll registration.
90 :
91 : Tracks pending operations for a file descriptor. The fd is registered
92 : once with epoll and stays registered until closed.
93 :
94 : This struct extends scheduler_op to support deferred I/O processing.
95 : When epoll events arrive, the reactor sets ready_events and queues
96 : this descriptor for processing. When popped from the scheduler queue,
97 : operator() performs the actual I/O and queues completion handlers.
98 :
99 : @par Deferred I/O Model
100 : The reactor no longer performs I/O directly. Instead:
101 : 1. Reactor sets ready_events and queues descriptor_state
102 : 2. Scheduler pops descriptor_state and calls operator()
103 : 3. operator() performs I/O under mutex and queues completions
104 :
105 : This eliminates per-descriptor mutex locking from the reactor hot path.
106 :
107 : @par Thread Safety
108 : The mutex protects operation pointers and ready flags during I/O.
109 : ready_events_ and is_enqueued_ are atomic for lock-free reactor access.
110 : */
111 : struct descriptor_state final : scheduler_op
112 : {
113 : std::mutex mutex;
114 :
115 : // Protected by mutex
116 : epoll_op* read_op = nullptr;
117 : epoll_op* write_op = nullptr;
118 : epoll_op* connect_op = nullptr;
119 :
120 : // Caches edge events that arrived before an op was registered
121 : bool read_ready = false;
122 : bool write_ready = false;
123 :
124 : // Deferred cancellation: set by cancel() when the target op is not
125 : // parked (e.g. completing inline via speculative I/O). Checked when
126 : // the next op parks; if set, the op is immediately self-cancelled.
127 : // This matches IOCP semantics where CancelIoEx always succeeds.
128 : bool read_cancel_pending = false;
129 : bool write_cancel_pending = false;
130 : bool connect_cancel_pending = false;
131 :
132 : // Set during registration only (no mutex needed)
133 : std::uint32_t registered_events = 0;
134 : int fd = -1;
135 :
136 : // For deferred I/O - set by reactor, read by scheduler
137 : std::atomic<std::uint32_t> ready_events_{0};
138 : std::atomic<bool> is_enqueued_{false};
139 : epoll_scheduler const* scheduler_ = nullptr;
140 :
141 : // Prevents impl destruction while this descriptor_state is queued.
142 : // Set by close_socket() when is_enqueued_ is true, cleared by operator().
143 : std::shared_ptr<void> impl_ref_;
144 :
145 : /// Add ready events atomically.
146 HIT 45424 : void add_ready_events(std::uint32_t ev) noexcept
147 : {
148 45424 : ready_events_.fetch_or(ev, std::memory_order_relaxed);
149 45424 : }
150 :
151 : /// Perform deferred I/O and queue completions.
152 : void operator()() override;
153 :
154 : /// Destroy without invoking.
155 : /// Called during scheduler::shutdown() drain. Clear impl_ref_ to break
156 : /// the self-referential cycle set by close_socket().
157 30 : void destroy() override
158 : {
159 30 : impl_ref_.reset();
160 30 : }
161 : };
162 :
163 : struct epoll_op : scheduler_op
164 : {
165 : struct canceller
166 : {
167 : epoll_op* op;
168 : void operator()() const noexcept;
169 : };
170 :
171 : std::coroutine_handle<> h;
172 : capy::executor_ref ex;
173 : std::error_code* ec_out = nullptr;
174 : std::size_t* bytes_out = nullptr;
175 :
176 : int fd = -1;
177 : int errn = 0;
178 : std::size_t bytes_transferred = 0;
179 :
180 : std::atomic<bool> cancelled{false};
181 : std::optional<std::stop_callback<canceller>> stop_cb;
182 :
183 : // Prevents use-after-free when socket is closed with pending ops.
184 : // See "Impl Lifetime Management" in file header.
185 : std::shared_ptr<void> impl_ptr;
186 :
187 : // For stop_token cancellation - pointer to owning socket/acceptor impl.
188 : // When stop is requested, we call back to the impl to perform actual I/O cancellation.
189 : epoll_socket* socket_impl_ = nullptr;
190 : epoll_acceptor* acceptor_impl_ = nullptr;
191 :
192 42871 : epoll_op() = default;
193 :
194 261246 : void reset() noexcept
195 : {
196 261246 : fd = -1;
197 261246 : errn = 0;
198 261246 : bytes_transferred = 0;
199 261246 : cancelled.store(false, std::memory_order_relaxed);
200 261246 : impl_ptr.reset();
201 261246 : socket_impl_ = nullptr;
202 261246 : acceptor_impl_ = nullptr;
203 261246 : }
204 :
205 : // Defined in sockets.cpp where epoll_socket is complete
206 : void operator()() override;
207 :
208 25126 : virtual bool is_read_operation() const noexcept
209 : {
210 25126 : return false;
211 : }
212 : virtual void cancel() noexcept = 0;
213 :
214 MIS 0 : void destroy() override
215 : {
216 0 : stop_cb.reset();
217 0 : impl_ptr.reset();
218 0 : }
219 :
220 HIT 129276 : void request_cancel() noexcept
221 : {
222 129276 : cancelled.store(true, std::memory_order_release);
223 129276 : }
224 :
225 55178 : void start(std::stop_token const& token, epoll_socket* impl)
226 : {
227 55178 : cancelled.store(false, std::memory_order_release);
228 55178 : stop_cb.reset();
229 55178 : socket_impl_ = impl;
230 55178 : acceptor_impl_ = nullptr;
231 :
232 55178 : if (token.stop_possible())
233 99 : stop_cb.emplace(token, canceller{this});
234 55178 : }
235 :
236 4742 : void start(std::stop_token const& token, epoll_acceptor* impl)
237 : {
238 4742 : cancelled.store(false, std::memory_order_release);
239 4742 : stop_cb.reset();
240 4742 : socket_impl_ = nullptr;
241 4742 : acceptor_impl_ = impl;
242 :
243 4742 : if (token.stop_possible())
244 9 : stop_cb.emplace(token, canceller{this});
245 4742 : }
246 :
247 59856 : void complete(int err, std::size_t bytes) noexcept
248 : {
249 59856 : errn = err;
250 59856 : bytes_transferred = bytes;
251 59856 : }
252 :
253 MIS 0 : virtual void perform_io() noexcept {}
254 : };
255 :
256 : struct epoll_connect_op final : epoll_op
257 : {
258 : endpoint target_endpoint;
259 :
260 HIT 4736 : void reset() noexcept
261 : {
262 4736 : epoll_op::reset();
263 4736 : target_endpoint = endpoint{};
264 4736 : }
265 :
266 4734 : void perform_io() noexcept override
267 : {
268 : // connect() completion status is retrieved via SO_ERROR, not return value
269 4734 : int err = 0;
270 4734 : socklen_t len = sizeof(err);
271 4734 : if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
272 MIS 0 : err = errno;
273 HIT 4734 : complete(err, 0);
274 4734 : }
275 :
276 : // Defined in sockets.cpp where epoll_socket is complete
277 : void operator()() override;
278 : void cancel() noexcept override;
279 : };
280 :
281 : struct epoll_read_op final : epoll_op
282 : {
283 : static constexpr std::size_t max_buffers = 16;
284 : iovec iovecs[max_buffers];
285 : int iovec_count = 0;
286 : bool empty_buffer_read = false;
287 :
288 25111 : bool is_read_operation() const noexcept override
289 : {
290 25111 : return !empty_buffer_read;
291 : }
292 :
293 125984 : void reset() noexcept
294 : {
295 125984 : epoll_op::reset();
296 125984 : iovec_count = 0;
297 125984 : empty_buffer_read = false;
298 125984 : }
299 :
300 146 : void perform_io() noexcept override
301 : {
302 : ssize_t n;
303 : do
304 : {
305 146 : n = ::readv(fd, iovecs, iovec_count);
306 : }
307 146 : while (n < 0 && errno == EINTR);
308 :
309 146 : if (n >= 0)
310 4 : complete(0, static_cast<std::size_t>(n));
311 : else
312 142 : complete(errno, 0);
313 146 : }
314 :
315 : void cancel() noexcept override;
316 : };
317 :
318 : struct epoll_write_op final : epoll_op
319 : {
320 : static constexpr std::size_t max_buffers = 16;
321 : iovec iovecs[max_buffers];
322 : int iovec_count = 0;
323 :
324 125784 : void reset() noexcept
325 : {
326 125784 : epoll_op::reset();
327 125784 : iovec_count = 0;
328 125784 : }
329 :
330 MIS 0 : void perform_io() noexcept override
331 : {
332 0 : msghdr msg{};
333 0 : msg.msg_iov = iovecs;
334 0 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
335 :
336 : ssize_t n;
337 : do
338 : {
339 0 : n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
340 : }
341 0 : while (n < 0 && errno == EINTR);
342 :
343 0 : if (n >= 0)
344 0 : complete(0, static_cast<std::size_t>(n));
345 : else
346 0 : complete(errno, 0);
347 0 : }
348 :
349 : void cancel() noexcept override;
350 : };
351 :
352 : struct epoll_accept_op final : epoll_op
353 : {
354 : int accepted_fd = -1;
355 : io_object::implementation** impl_out = nullptr;
356 : sockaddr_storage peer_storage{};
357 :
358 HIT 4742 : void reset() noexcept
359 : {
360 4742 : epoll_op::reset();
361 4742 : accepted_fd = -1;
362 4742 : impl_out = nullptr;
363 4742 : peer_storage = {};
364 4742 : }
365 :
366 4731 : void perform_io() noexcept override
367 : {
368 4731 : socklen_t addrlen = sizeof(peer_storage);
369 : int new_fd;
370 : do
371 : {
372 9462 : new_fd = ::accept4(
373 4731 : fd, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen,
374 : SOCK_NONBLOCK | SOCK_CLOEXEC);
375 : }
376 4731 : while (new_fd < 0 && errno == EINTR);
377 :
378 4731 : if (new_fd >= 0)
379 : {
380 4731 : accepted_fd = new_fd;
381 4731 : complete(0, 0);
382 : }
383 : else
384 : {
385 MIS 0 : complete(errno, 0);
386 : }
387 HIT 4731 : }
388 :
389 : // Defined in acceptors.cpp where epoll_acceptor is complete
390 : void operator()() override;
391 : void cancel() noexcept override;
392 : };
393 :
394 : } // namespace boost::corosio::detail
395 :
396 : #endif // BOOST_COROSIO_HAS_EPOLL
397 :
398 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
|