TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 : #include <boost/corosio/detail/socket_service.hpp>
20 :
21 : #include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
22 : #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
23 :
24 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
25 : #include <boost/corosio/native/detail/make_err.hpp>
26 : #include <boost/corosio/detail/dispatch_coro.hpp>
27 : #include <boost/corosio/detail/except.hpp>
28 : #include <boost/capy/buffers.hpp>
29 :
30 : #include <coroutine>
31 : #include <mutex>
32 : #include <unordered_map>
33 : #include <utility>
34 :
35 : #include <errno.h>
36 : #include <netinet/in.h>
37 : #include <netinet/tcp.h>
38 : #include <sys/epoll.h>
39 : #include <sys/socket.h>
40 : #include <unistd.h>
41 :
42 : /*
43 : epoll Socket Implementation
44 : ===========================
45 :
46 : Each I/O operation follows the same pattern:
47 : 1. Try the syscall immediately (non-blocking socket)
48 : 2. If it succeeds or fails with a real error, post to completion queue
49 : 3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
50 :
51 : This "try first" approach avoids unnecessary epoll round-trips for
52 : operations that can complete immediately (common for small reads/writes
53 : on fast local connections).
54 :
55 : One-Shot Registration
56 : ---------------------
57 : We use one-shot epoll registration: each operation registers, waits for
58 : one event, then unregisters. This simplifies the state machine since we
59 : don't need to track whether an fd is currently registered or handle
60 : re-arming. The tradeoff is slightly more epoll_ctl calls, but the
61 : simplicity is worth it.
62 :
63 : Cancellation
64 : ------------
65 : See op.hpp for the completion/cancellation race handling via the
66 : `registered` atomic. cancel() must complete pending operations (post
67 : them with cancelled flag) so coroutines waiting on them can resume.
68 : close_socket() calls cancel() first to ensure this.
69 :
70 : Impl Lifetime with shared_ptr
71 : -----------------------------
72 : Socket impls use enable_shared_from_this. The service owns impls via
73 : shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
74 : removal. When a user calls close(), we call cancel() which posts pending
75 : ops to the scheduler.
76 :
77 : CRITICAL: The posted ops must keep the impl alive until they complete.
78 : Otherwise the scheduler would process a freed op (use-after-free). The
79 : cancel() method captures shared_from_this() into op.impl_ptr before
80 : posting. When the op completes, impl_ptr is cleared, allowing the impl
81 : to be destroyed if no other references exist.
82 :
83 : Service Ownership
84 : -----------------
85 : epoll_socket_service owns all socket impls. destroy_impl() removes the
86 : shared_ptr from the map, but the impl may survive if ops still hold
87 : impl_ptr refs. shutdown() closes all sockets and clears the map; any
88 : in-flight ops will complete and release their refs.
89 : */
90 :
91 : namespace boost::corosio::detail {
92 :
93 : /** State for epoll socket service. */
94 : class epoll_socket_state
95 : {
96 : public:
97 HIT 239 : explicit epoll_socket_state(epoll_scheduler& sched) noexcept : sched_(sched)
98 : {
99 239 : }
100 :
101 : epoll_scheduler& sched_;
102 : std::mutex mutex_;
103 : intrusive_list<epoll_socket> socket_list_;
104 : std::unordered_map<epoll_socket*, std::shared_ptr<epoll_socket>>
105 : socket_ptrs_;
106 : };
107 :
108 : /** epoll socket service implementation.
109 :
110 : Inherits from socket_service to enable runtime polymorphism.
111 : Uses key_type = socket_service for service lookup.
112 : */
113 : class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
114 : {
115 : public:
116 : explicit epoll_socket_service(capy::execution_context& ctx);
117 : ~epoll_socket_service() override;
118 :
119 : epoll_socket_service(epoll_socket_service const&) = delete;
120 : epoll_socket_service& operator=(epoll_socket_service const&) = delete;
121 :
122 : void shutdown() override;
123 :
124 : io_object::implementation* construct() override;
125 : void destroy(io_object::implementation*) override;
126 : void close(io_object::handle&) override;
127 : std::error_code open_socket(
128 : tcp_socket::implementation& impl,
129 : int family,
130 : int type,
131 : int protocol) override;
132 :
133 325711 : epoll_scheduler& scheduler() const noexcept
134 : {
135 325711 : return state_->sched_;
136 : }
137 : void post(epoll_op* op);
138 : void work_started() noexcept;
139 : void work_finished() noexcept;
140 :
141 : private:
142 : std::unique_ptr<epoll_socket_state> state_;
143 : };
144 :
145 : //--------------------------------------------------------------------------
146 : //
147 : // Implementation
148 : //
149 : //--------------------------------------------------------------------------
150 :
151 : // Register an op with the reactor, handling cached edge events.
152 : // Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
153 : inline void
154 4937 : epoll_socket::register_op(
155 : epoll_op& op,
156 : epoll_op*& desc_slot,
157 : bool& ready_flag,
158 : bool& cancel_flag) noexcept
159 : {
160 4937 : svc_.work_started();
161 :
162 4937 : std::lock_guard lock(desc_state_.mutex);
163 4937 : bool io_done = false;
164 4937 : if (ready_flag)
165 : {
166 142 : ready_flag = false;
167 142 : op.perform_io();
168 142 : io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
169 142 : if (!io_done)
170 142 : op.errn = 0;
171 : }
172 :
173 4937 : if (cancel_flag)
174 : {
175 95 : cancel_flag = false;
176 95 : op.cancelled.store(true, std::memory_order_relaxed);
177 : }
178 :
179 4937 : if (io_done || op.cancelled.load(std::memory_order_acquire))
180 : {
181 95 : svc_.post(&op);
182 95 : svc_.work_finished();
183 : }
184 : else
185 : {
186 4842 : desc_slot = &op;
187 : }
188 4937 : }
189 :
190 : inline void
191 104 : epoll_op::canceller::operator()() const noexcept
192 : {
193 104 : op->cancel();
194 104 : }
195 :
196 : inline void
197 MIS 0 : epoll_connect_op::cancel() noexcept
198 : {
199 0 : if (socket_impl_)
200 0 : socket_impl_->cancel_single_op(*this);
201 : else
202 0 : request_cancel();
203 0 : }
204 :
205 : inline void
206 HIT 98 : epoll_read_op::cancel() noexcept
207 : {
208 98 : if (socket_impl_)
209 98 : socket_impl_->cancel_single_op(*this);
210 : else
211 MIS 0 : request_cancel();
212 HIT 98 : }
213 :
214 : inline void
215 MIS 0 : epoll_write_op::cancel() noexcept
216 : {
217 0 : if (socket_impl_)
218 0 : socket_impl_->cancel_single_op(*this);
219 : else
220 0 : request_cancel();
221 0 : }
222 :
223 : inline void
224 HIT 50442 : epoll_op::operator()()
225 : {
226 50442 : stop_cb.reset();
227 :
228 50442 : socket_impl_->svc_.scheduler().reset_inline_budget();
229 :
230 50442 : if (cancelled.load(std::memory_order_acquire))
231 205 : *ec_out = capy::error::canceled;
232 50237 : else if (errn != 0)
233 MIS 0 : *ec_out = make_err(errn);
234 HIT 50237 : else if (is_read_operation() && bytes_transferred == 0)
235 MIS 0 : *ec_out = capy::error::eof;
236 : else
237 HIT 50237 : *ec_out = {};
238 :
239 50442 : *bytes_out = bytes_transferred;
240 :
241 : // Move to stack before resuming coroutine. The coroutine might close
242 : // the socket, releasing the last wrapper ref. If impl_ptr were the
243 : // last ref and we destroyed it while still in operator(), we'd have
244 : // use-after-free. Moving to local ensures destruction happens at
245 : // function exit, after all member accesses are complete.
246 50442 : capy::executor_ref saved_ex(ex);
247 50442 : std::coroutine_handle<> saved_h(h);
248 50442 : auto prevent_premature_destruction = std::move(impl_ptr);
249 50442 : dispatch_coro(saved_ex, saved_h).resume();
250 50442 : }
251 :
252 : inline void
253 4736 : epoll_connect_op::operator()()
254 : {
255 4736 : stop_cb.reset();
256 :
257 4736 : socket_impl_->svc_.scheduler().reset_inline_budget();
258 :
259 4736 : bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
260 :
261 : // Cache endpoints on successful connect
262 4736 : if (success && socket_impl_)
263 : {
264 4733 : endpoint local_ep;
265 4733 : sockaddr_storage local_storage{};
266 4733 : socklen_t local_len = sizeof(local_storage);
267 4733 : if (::getsockname(
268 4733 : fd, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
269 : 0)
270 4733 : local_ep = from_sockaddr(local_storage);
271 4733 : static_cast<epoll_socket*>(socket_impl_)
272 4733 : ->set_endpoints(local_ep, target_endpoint);
273 : }
274 :
275 4736 : if (cancelled.load(std::memory_order_acquire))
276 MIS 0 : *ec_out = capy::error::canceled;
277 HIT 4736 : else if (errn != 0)
278 3 : *ec_out = make_err(errn);
279 : else
280 4733 : *ec_out = {};
281 :
282 : // Move to stack before resuming. See epoll_op::operator()() for rationale.
283 4736 : capy::executor_ref saved_ex(ex);
284 4736 : std::coroutine_handle<> saved_h(h);
285 4736 : auto prevent_premature_destruction = std::move(impl_ptr);
286 4736 : dispatch_coro(saved_ex, saved_h).resume();
287 4736 : }
288 :
289 14263 : inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
290 14263 : : svc_(svc)
291 : {
292 14263 : }
293 :
294 14263 : inline epoll_socket::~epoll_socket() = default;
295 :
296 : inline std::coroutine_handle<>
297 4736 : epoll_socket::connect(
298 : std::coroutine_handle<> h,
299 : capy::executor_ref ex,
300 : endpoint ep,
301 : std::stop_token token,
302 : std::error_code* ec)
303 : {
304 4736 : auto& op = conn_;
305 :
306 4736 : sockaddr_storage storage{};
307 : socklen_t addrlen =
308 4736 : detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
309 4736 : int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
310 :
311 4736 : if (result == 0)
312 : {
313 MIS 0 : sockaddr_storage local_storage{};
314 0 : socklen_t local_len = sizeof(local_storage);
315 0 : if (::getsockname(
316 0 : fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
317 : 0)
318 0 : local_endpoint_ = detail::from_sockaddr(local_storage);
319 0 : remote_endpoint_ = ep;
320 : }
321 :
322 HIT 4736 : if (result == 0 || errno != EINPROGRESS)
323 : {
324 MIS 0 : int err = (result < 0) ? errno : 0;
325 0 : if (svc_.scheduler().try_consume_inline_budget())
326 : {
327 0 : *ec = err ? make_err(err) : std::error_code{};
328 0 : return dispatch_coro(ex, h);
329 : }
330 0 : op.reset();
331 0 : op.h = h;
332 0 : op.ex = ex;
333 0 : op.ec_out = ec;
334 0 : op.fd = fd_;
335 0 : op.target_endpoint = ep;
336 0 : op.start(token, this);
337 0 : op.impl_ptr = shared_from_this();
338 0 : op.complete(err, 0);
339 0 : svc_.post(&op);
340 0 : return std::noop_coroutine();
341 : }
342 :
343 : // EINPROGRESS — register with reactor
344 HIT 4736 : op.reset();
345 4736 : op.h = h;
346 4736 : op.ex = ex;
347 4736 : op.ec_out = ec;
348 4736 : op.fd = fd_;
349 4736 : op.target_endpoint = ep;
350 4736 : op.start(token, this);
351 4736 : op.impl_ptr = shared_from_this();
352 :
353 4736 : register_op(
354 4736 : op, desc_state_.connect_op, desc_state_.write_ready,
355 4736 : desc_state_.connect_cancel_pending);
356 4736 : return std::noop_coroutine();
357 : }
358 :
359 : inline std::coroutine_handle<>
360 125984 : epoll_socket::read_some(
361 : std::coroutine_handle<> h,
362 : capy::executor_ref ex,
363 : buffer_param param,
364 : std::stop_token token,
365 : std::error_code* ec,
366 : std::size_t* bytes_out)
367 : {
368 125984 : auto& op = rd_;
369 125984 : op.reset();
370 :
371 125984 : capy::mutable_buffer bufs[epoll_read_op::max_buffers];
372 125984 : op.iovec_count =
373 125984 : static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
374 :
375 125984 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
376 : {
377 1 : op.empty_buffer_read = true;
378 1 : op.h = h;
379 1 : op.ex = ex;
380 1 : op.ec_out = ec;
381 1 : op.bytes_out = bytes_out;
382 1 : op.start(token, this);
383 1 : op.impl_ptr = shared_from_this();
384 1 : op.complete(0, 0);
385 1 : svc_.post(&op);
386 1 : return std::noop_coroutine();
387 : }
388 :
389 251966 : for (int i = 0; i < op.iovec_count; ++i)
390 : {
391 125983 : op.iovecs[i].iov_base = bufs[i].data();
392 125983 : op.iovecs[i].iov_len = bufs[i].size();
393 : }
394 :
395 : // Speculative read
396 : ssize_t n;
397 : do
398 : {
399 125983 : n = ::readv(fd_, op.iovecs, op.iovec_count);
400 : }
401 125983 : while (n < 0 && errno == EINTR);
402 :
403 125983 : if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
404 : {
405 125782 : int err = (n < 0) ? errno : 0;
406 125782 : auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
407 :
408 125782 : if (svc_.scheduler().try_consume_inline_budget())
409 : {
410 100672 : if (err)
411 MIS 0 : *ec = make_err(err);
412 HIT 100672 : else if (n == 0)
413 5 : *ec = capy::error::eof;
414 : else
415 100667 : *ec = {};
416 100672 : *bytes_out = bytes;
417 100672 : return dispatch_coro(ex, h);
418 : }
419 25110 : op.h = h;
420 25110 : op.ex = ex;
421 25110 : op.ec_out = ec;
422 25110 : op.bytes_out = bytes_out;
423 25110 : op.start(token, this);
424 25110 : op.impl_ptr = shared_from_this();
425 25110 : op.complete(err, bytes);
426 25110 : svc_.post(&op);
427 25110 : return std::noop_coroutine();
428 : }
429 :
430 : // EAGAIN — register with reactor
431 201 : op.h = h;
432 201 : op.ex = ex;
433 201 : op.ec_out = ec;
434 201 : op.bytes_out = bytes_out;
435 201 : op.fd = fd_;
436 201 : op.start(token, this);
437 201 : op.impl_ptr = shared_from_this();
438 :
439 201 : register_op(
440 201 : op, desc_state_.read_op, desc_state_.read_ready,
441 201 : desc_state_.read_cancel_pending);
442 201 : return std::noop_coroutine();
443 : }
444 :
445 : inline std::coroutine_handle<>
446 125784 : epoll_socket::write_some(
447 : std::coroutine_handle<> h,
448 : capy::executor_ref ex,
449 : buffer_param param,
450 : std::stop_token token,
451 : std::error_code* ec,
452 : std::size_t* bytes_out)
453 : {
454 125784 : auto& op = wr_;
455 125784 : op.reset();
456 :
457 125784 : capy::mutable_buffer bufs[epoll_write_op::max_buffers];
458 125784 : op.iovec_count =
459 125784 : static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
460 :
461 125784 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
462 : {
463 1 : op.h = h;
464 1 : op.ex = ex;
465 1 : op.ec_out = ec;
466 1 : op.bytes_out = bytes_out;
467 1 : op.start(token, this);
468 1 : op.impl_ptr = shared_from_this();
469 1 : op.complete(0, 0);
470 1 : svc_.post(&op);
471 1 : return std::noop_coroutine();
472 : }
473 :
474 251566 : for (int i = 0; i < op.iovec_count; ++i)
475 : {
476 125783 : op.iovecs[i].iov_base = bufs[i].data();
477 125783 : op.iovecs[i].iov_len = bufs[i].size();
478 : }
479 :
480 : // Speculative write
481 125783 : msghdr msg{};
482 125783 : msg.msg_iov = op.iovecs;
483 125783 : msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
484 :
485 : ssize_t n;
486 : do
487 : {
488 125783 : n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
489 : }
490 125783 : while (n < 0 && errno == EINTR);
491 :
492 125783 : if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
493 : {
494 125783 : int err = (n < 0) ? errno : 0;
495 125783 : auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
496 :
497 125783 : if (svc_.scheduler().try_consume_inline_budget())
498 : {
499 100654 : *ec = err ? make_err(err) : std::error_code{};
500 100654 : *bytes_out = bytes;
501 100654 : return dispatch_coro(ex, h);
502 : }
503 25129 : op.h = h;
504 25129 : op.ex = ex;
505 25129 : op.ec_out = ec;
506 25129 : op.bytes_out = bytes_out;
507 25129 : op.start(token, this);
508 25129 : op.impl_ptr = shared_from_this();
509 25129 : op.complete(err, bytes);
510 25129 : svc_.post(&op);
511 25129 : return std::noop_coroutine();
512 : }
513 :
514 : // EAGAIN — register with reactor
515 MIS 0 : op.h = h;
516 0 : op.ex = ex;
517 0 : op.ec_out = ec;
518 0 : op.bytes_out = bytes_out;
519 0 : op.fd = fd_;
520 0 : op.start(token, this);
521 0 : op.impl_ptr = shared_from_this();
522 :
523 0 : register_op(
524 0 : op, desc_state_.write_op, desc_state_.write_ready,
525 0 : desc_state_.write_cancel_pending);
526 0 : return std::noop_coroutine();
527 : }
528 :
529 : inline std::error_code
530 HIT 3 : epoll_socket::shutdown(tcp_socket::shutdown_type what) noexcept
531 : {
532 : int how;
533 3 : switch (what)
534 : {
535 1 : case tcp_socket::shutdown_receive:
536 1 : how = SHUT_RD;
537 1 : break;
538 1 : case tcp_socket::shutdown_send:
539 1 : how = SHUT_WR;
540 1 : break;
541 1 : case tcp_socket::shutdown_both:
542 1 : how = SHUT_RDWR;
543 1 : break;
544 MIS 0 : default:
545 0 : return make_err(EINVAL);
546 : }
547 HIT 3 : if (::shutdown(fd_, how) != 0)
548 MIS 0 : return make_err(errno);
549 HIT 3 : return {};
550 : }
551 :
552 : inline std::error_code
553 32 : epoll_socket::set_option(
554 : int level, int optname, void const* data, std::size_t size) noexcept
555 : {
556 32 : if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) !=
557 : 0)
558 MIS 0 : return make_err(errno);
559 HIT 32 : return {};
560 : }
561 :
562 : inline std::error_code
563 31 : epoll_socket::get_option(
564 : int level, int optname, void* data, std::size_t* size) const noexcept
565 : {
566 31 : socklen_t len = static_cast<socklen_t>(*size);
567 31 : if (::getsockopt(fd_, level, optname, data, &len) != 0)
568 MIS 0 : return make_err(errno);
569 HIT 31 : *size = static_cast<std::size_t>(len);
570 31 : return {};
571 : }
572 :
573 : inline void
574 187 : epoll_socket::cancel() noexcept
575 : {
576 187 : auto self = weak_from_this().lock();
577 187 : if (!self)
578 MIS 0 : return;
579 :
580 HIT 187 : conn_.request_cancel();
581 187 : rd_.request_cancel();
582 187 : wr_.request_cancel();
583 :
584 187 : epoll_op* conn_claimed = nullptr;
585 187 : epoll_op* rd_claimed = nullptr;
586 187 : epoll_op* wr_claimed = nullptr;
587 : {
588 187 : std::lock_guard lock(desc_state_.mutex);
589 187 : if (desc_state_.connect_op == &conn_)
590 MIS 0 : conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
591 : else
592 HIT 187 : desc_state_.connect_cancel_pending = true;
593 187 : if (desc_state_.read_op == &rd_)
594 3 : rd_claimed = std::exchange(desc_state_.read_op, nullptr);
595 : else
596 184 : desc_state_.read_cancel_pending = true;
597 187 : if (desc_state_.write_op == &wr_)
598 MIS 0 : wr_claimed = std::exchange(desc_state_.write_op, nullptr);
599 : else
600 HIT 187 : desc_state_.write_cancel_pending = true;
601 187 : }
602 :
603 187 : if (conn_claimed)
604 : {
605 MIS 0 : conn_.impl_ptr = self;
606 0 : svc_.post(&conn_);
607 0 : svc_.work_finished();
608 : }
609 HIT 187 : if (rd_claimed)
610 : {
611 3 : rd_.impl_ptr = self;
612 3 : svc_.post(&rd_);
613 3 : svc_.work_finished();
614 : }
615 187 : if (wr_claimed)
616 : {
617 MIS 0 : wr_.impl_ptr = self;
618 0 : svc_.post(&wr_);
619 0 : svc_.work_finished();
620 : }
621 HIT 187 : }
622 :
623 : inline void
624 98 : epoll_socket::cancel_single_op(epoll_op& op) noexcept
625 : {
626 98 : auto self = weak_from_this().lock();
627 98 : if (!self)
628 MIS 0 : return;
629 :
630 HIT 98 : op.request_cancel();
631 :
632 98 : epoll_op** desc_op_ptr = nullptr;
633 98 : if (&op == &conn_)
634 MIS 0 : desc_op_ptr = &desc_state_.connect_op;
635 HIT 98 : else if (&op == &rd_)
636 98 : desc_op_ptr = &desc_state_.read_op;
637 MIS 0 : else if (&op == &wr_)
638 0 : desc_op_ptr = &desc_state_.write_op;
639 :
640 HIT 98 : if (desc_op_ptr)
641 : {
642 98 : epoll_op* claimed = nullptr;
643 : {
644 98 : std::lock_guard lock(desc_state_.mutex);
645 98 : if (*desc_op_ptr == &op)
646 98 : claimed = std::exchange(*desc_op_ptr, nullptr);
647 MIS 0 : else if (&op == &conn_)
648 0 : desc_state_.connect_cancel_pending = true;
649 0 : else if (&op == &rd_)
650 0 : desc_state_.read_cancel_pending = true;
651 0 : else if (&op == &wr_)
652 0 : desc_state_.write_cancel_pending = true;
653 HIT 98 : }
654 98 : if (claimed)
655 : {
656 98 : op.impl_ptr = self;
657 98 : svc_.post(&op);
658 98 : svc_.work_finished();
659 : }
660 : }
661 98 : }
662 :
663 : inline void
664 42761 : epoll_socket::close_socket() noexcept
665 : {
666 42761 : auto self = weak_from_this().lock();
667 42761 : if (self)
668 : {
669 42761 : conn_.request_cancel();
670 42761 : rd_.request_cancel();
671 42761 : wr_.request_cancel();
672 :
673 42761 : epoll_op* conn_claimed = nullptr;
674 42761 : epoll_op* rd_claimed = nullptr;
675 42761 : epoll_op* wr_claimed = nullptr;
676 : {
677 42761 : std::lock_guard lock(desc_state_.mutex);
678 42761 : conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
679 42761 : rd_claimed = std::exchange(desc_state_.read_op, nullptr);
680 42761 : wr_claimed = std::exchange(desc_state_.write_op, nullptr);
681 42761 : desc_state_.read_ready = false;
682 42761 : desc_state_.write_ready = false;
683 42761 : desc_state_.read_cancel_pending = false;
684 42761 : desc_state_.write_cancel_pending = false;
685 42761 : desc_state_.connect_cancel_pending = false;
686 42761 : }
687 :
688 42761 : if (conn_claimed)
689 : {
690 MIS 0 : conn_.impl_ptr = self;
691 0 : svc_.post(&conn_);
692 0 : svc_.work_finished();
693 : }
694 HIT 42761 : if (rd_claimed)
695 : {
696 1 : rd_.impl_ptr = self;
697 1 : svc_.post(&rd_);
698 1 : svc_.work_finished();
699 : }
700 42761 : if (wr_claimed)
701 : {
702 MIS 0 : wr_.impl_ptr = self;
703 0 : svc_.post(&wr_);
704 0 : svc_.work_finished();
705 : }
706 :
707 HIT 42761 : if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
708 97 : desc_state_.impl_ref_ = self;
709 : }
710 :
711 42761 : if (fd_ >= 0)
712 : {
713 9484 : if (desc_state_.registered_events != 0)
714 9484 : svc_.scheduler().deregister_descriptor(fd_);
715 9484 : ::close(fd_);
716 9484 : fd_ = -1;
717 : }
718 :
719 42761 : desc_state_.fd = -1;
720 42761 : desc_state_.registered_events = 0;
721 :
722 42761 : local_endpoint_ = endpoint{};
723 42761 : remote_endpoint_ = endpoint{};
724 42761 : }
725 :
726 239 : inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
727 239 : : state_(
728 : std::make_unique<epoll_socket_state>(
729 239 : ctx.use_service<epoll_scheduler>()))
730 : {
731 239 : }
732 :
733 478 : inline epoll_socket_service::~epoll_socket_service() {}
734 :
735 : inline void
736 239 : epoll_socket_service::shutdown()
737 : {
738 239 : std::lock_guard lock(state_->mutex_);
739 :
740 239 : while (auto* impl = state_->socket_list_.pop_front())
741 MIS 0 : impl->close_socket();
742 :
743 : // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
744 : // drains completed_ops_, calling destroy() on each queued op. If we
745 : // released our shared_ptrs now, an epoll_op::destroy() could free the
746 : // last ref to an impl whose embedded descriptor_state is still linked
747 : // in the queue — use-after-free on the next pop(). Letting ~state_
748 : // release the ptrs (during service destruction, after scheduler
749 : // shutdown) keeps every impl alive until all ops have been drained.
750 HIT 239 : }
751 :
752 : inline io_object::implementation*
753 14263 : epoll_socket_service::construct()
754 : {
755 14263 : auto impl = std::make_shared<epoll_socket>(*this);
756 14263 : auto* raw = impl.get();
757 :
758 : {
759 14263 : std::lock_guard lock(state_->mutex_);
760 14263 : state_->socket_list_.push_back(raw);
761 14263 : state_->socket_ptrs_.emplace(raw, std::move(impl));
762 14263 : }
763 :
764 14263 : return raw;
765 14263 : }
766 :
767 : inline void
768 14263 : epoll_socket_service::destroy(io_object::implementation* impl)
769 : {
770 14263 : auto* epoll_impl = static_cast<epoll_socket*>(impl);
771 14263 : epoll_impl->close_socket();
772 14263 : std::lock_guard lock(state_->mutex_);
773 14263 : state_->socket_list_.remove(epoll_impl);
774 14263 : state_->socket_ptrs_.erase(epoll_impl);
775 14263 : }
776 :
777 : inline std::error_code
778 4751 : epoll_socket_service::open_socket(
779 : tcp_socket::implementation& impl, int family, int type, int protocol)
780 : {
781 4751 : auto* epoll_impl = static_cast<epoll_socket*>(&impl);
782 4751 : epoll_impl->close_socket();
783 :
784 4751 : int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
785 4751 : if (fd < 0)
786 MIS 0 : return make_err(errno);
787 :
788 HIT 4751 : if (family == AF_INET6)
789 : {
790 5 : int one = 1;
791 5 : ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
792 : }
793 :
794 4751 : epoll_impl->fd_ = fd;
795 :
796 : // Register fd with epoll (edge-triggered mode)
797 4751 : epoll_impl->desc_state_.fd = fd;
798 : {
799 4751 : std::lock_guard lock(epoll_impl->desc_state_.mutex);
800 4751 : epoll_impl->desc_state_.read_op = nullptr;
801 4751 : epoll_impl->desc_state_.write_op = nullptr;
802 4751 : epoll_impl->desc_state_.connect_op = nullptr;
803 4751 : }
804 4751 : scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
805 :
806 4751 : return {};
807 : }
808 :
809 : inline void
810 23747 : epoll_socket_service::close(io_object::handle& h)
811 : {
812 23747 : static_cast<epoll_socket*>(h.get())->close_socket();
813 23747 : }
814 :
815 : inline void
816 50438 : epoll_socket_service::post(epoll_op* op)
817 : {
818 50438 : state_->sched_.post(op);
819 50438 : }
820 :
821 : inline void
822 4937 : epoll_socket_service::work_started() noexcept
823 : {
824 4937 : state_->sched_.work_started();
825 4937 : }
826 :
827 : inline void
828 197 : epoll_socket_service::work_finished() noexcept
829 : {
830 197 : state_->sched_.work_finished();
831 197 : }
832 :
833 : } // namespace boost::corosio::detail
834 :
835 : #endif // BOOST_COROSIO_HAS_EPOLL
836 :
837 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
|