include/boost/corosio/native/detail/epoll/epoll_acceptor_service.hpp

80.6% Lines (203/252) 95.8% Functions (23/24)
include/boost/corosio/native/detail/epoll/epoll_acceptor_service.hpp
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/acceptor_service.hpp>
20
21 #include <boost/corosio/native/detail/epoll/epoll_acceptor.hpp>
22 #include <boost/corosio/native/detail/epoll/epoll_socket_service.hpp>
23 #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
24
25 #include <boost/corosio/native/detail/endpoint_convert.hpp>
26 #include <boost/corosio/detail/dispatch_coro.hpp>
27 #include <boost/corosio/native/detail/make_err.hpp>
28
29 #include <memory>
30 #include <mutex>
31 #include <unordered_map>
32 #include <utility>
33
34 #include <errno.h>
35 #include <netinet/in.h>
36 #include <sys/epoll.h>
37 #include <sys/socket.h>
38 #include <unistd.h>
39
40 namespace boost::corosio::detail {
41
42 /** State for epoll acceptor service. */
43 class epoll_acceptor_state
44 {
45 public:
46 239 explicit epoll_acceptor_state(epoll_scheduler& sched) noexcept
47 239 : sched_(sched)
48 {
49 239 }
50
51 epoll_scheduler& sched_;
52 std::mutex mutex_;
53 intrusive_list<epoll_acceptor> acceptor_list_;
54 std::unordered_map<epoll_acceptor*, std::shared_ptr<epoll_acceptor>>
55 acceptor_ptrs_;
56 };
57
58 /** epoll acceptor service implementation.
59
60 Inherits from acceptor_service to enable runtime polymorphism.
61 Uses key_type = acceptor_service for service lookup.
62 */
63 class BOOST_COROSIO_DECL epoll_acceptor_service final : public acceptor_service
64 {
65 public:
66 explicit epoll_acceptor_service(capy::execution_context& ctx);
67 ~epoll_acceptor_service() override;
68
69 epoll_acceptor_service(epoll_acceptor_service const&) = delete;
70 epoll_acceptor_service& operator=(epoll_acceptor_service const&) = delete;
71
72 void shutdown() override;
73
74 io_object::implementation* construct() override;
75 void destroy(io_object::implementation*) override;
76 void close(io_object::handle&) override;
77 std::error_code open_acceptor_socket(
78 tcp_acceptor::implementation& impl,
79 int family,
80 int type,
81 int protocol) override;
82 std::error_code
83 bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
84 std::error_code
85 listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
86
87 4898 epoll_scheduler& scheduler() const noexcept
88 {
89 4898 return state_->sched_;
90 }
91 void post(epoll_op* op);
92 void work_started() noexcept;
93 void work_finished() noexcept;
94
95 /** Get the socket service for creating peer sockets during accept. */
96 epoll_socket_service* socket_service() const noexcept;
97
98 private:
99 capy::execution_context& ctx_;
100 std::unique_ptr<epoll_acceptor_state> state_;
101 };
102
103 //--------------------------------------------------------------------------
104 //
105 // Implementation
106 //
107 //--------------------------------------------------------------------------
108
109 inline void
110 6 epoll_accept_op::cancel() noexcept
111 {
112 6 if (acceptor_impl_)
113 6 acceptor_impl_->cancel_single_op(*this);
114 else
115 request_cancel();
116 6 }
117
118 inline void
119 4742 epoll_accept_op::operator()()
120 {
121 4742 stop_cb.reset();
122
123 4742 static_cast<epoll_acceptor*>(acceptor_impl_)
124 4742 ->service()
125 4742 .scheduler()
126 4742 .reset_inline_budget();
127
128 4742 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
129
130 4742 if (cancelled.load(std::memory_order_acquire))
131 9 *ec_out = capy::error::canceled;
132 4733 else if (errn != 0)
133 *ec_out = make_err(errn);
134 else
135 4733 *ec_out = {};
136
137 // Set up the peer socket on success
138 4742 if (success && accepted_fd >= 0 && acceptor_impl_)
139 {
140 4733 auto* socket_svc = static_cast<epoll_acceptor*>(acceptor_impl_)
141 4733 ->service()
142 4733 .socket_service();
143 4733 if (socket_svc)
144 {
145 4733 auto& impl = static_cast<epoll_socket&>(*socket_svc->construct());
146 4733 impl.set_socket(accepted_fd);
147
148 4733 impl.desc_state_.fd = accepted_fd;
149 {
150 4733 std::lock_guard lock(impl.desc_state_.mutex);
151 4733 impl.desc_state_.read_op = nullptr;
152 4733 impl.desc_state_.write_op = nullptr;
153 4733 impl.desc_state_.connect_op = nullptr;
154 4733 }
155 4733 socket_svc->scheduler().register_descriptor(
156 accepted_fd, &impl.desc_state_);
157
158 4733 impl.set_endpoints(
159 4733 static_cast<epoll_acceptor*>(acceptor_impl_)->local_endpoint(),
160 4733 from_sockaddr(peer_storage));
161
162 4733 if (impl_out)
163 4733 *impl_out = &impl;
164 4733 accepted_fd = -1;
165 }
166 else
167 {
168 // No socket service — treat as error
169 *ec_out = make_err(ENOENT);
170 success = false;
171 }
172 }
173
174 4742 if (!success || !acceptor_impl_)
175 {
176 9 if (accepted_fd >= 0)
177 {
178 ::close(accepted_fd);
179 accepted_fd = -1;
180 }
181 9 if (impl_out)
182 9 *impl_out = nullptr;
183 }
184
185 // Move to stack before resuming. See epoll_op::operator()() for rationale.
186 4742 capy::executor_ref saved_ex(ex);
187 4742 std::coroutine_handle<> saved_h(h);
188 4742 auto prevent_premature_destruction = std::move(impl_ptr);
189 4742 dispatch_coro(saved_ex, saved_h).resume();
190 4742 }
191
192 82 inline epoll_acceptor::epoll_acceptor(epoll_acceptor_service& svc) noexcept
193 82 : svc_(svc)
194 {
195 82 }
196
197 inline std::coroutine_handle<>
198 4742 epoll_acceptor::accept(
199 std::coroutine_handle<> h,
200 capy::executor_ref ex,
201 std::stop_token token,
202 std::error_code* ec,
203 io_object::implementation** impl_out)
204 {
205 4742 auto& op = acc_;
206 4742 op.reset();
207 4742 op.h = h;
208 4742 op.ex = ex;
209 4742 op.ec_out = ec;
210 4742 op.impl_out = impl_out;
211 4742 op.fd = fd_;
212 4742 op.start(token, this);
213
214 4742 sockaddr_storage peer_storage{};
215 4742 socklen_t addrlen = sizeof(peer_storage);
216 int accepted;
217 do
218 {
219 4742 accepted = ::accept4(
220 fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen,
221 SOCK_NONBLOCK | SOCK_CLOEXEC);
222 }
223 4742 while (accepted < 0 && errno == EINTR);
224
225 4742 if (accepted >= 0)
226 {
227 {
228 2 std::lock_guard lock(desc_state_.mutex);
229 2 desc_state_.read_ready = false;
230 2 }
231
232 2 if (svc_.scheduler().try_consume_inline_budget())
233 {
234 auto* socket_svc = svc_.socket_service();
235 if (socket_svc)
236 {
237 auto& impl =
238 static_cast<epoll_socket&>(*socket_svc->construct());
239 impl.set_socket(accepted);
240
241 impl.desc_state_.fd = accepted;
242 {
243 std::lock_guard lock(impl.desc_state_.mutex);
244 impl.desc_state_.read_op = nullptr;
245 impl.desc_state_.write_op = nullptr;
246 impl.desc_state_.connect_op = nullptr;
247 }
248 socket_svc->scheduler().register_descriptor(
249 accepted, &impl.desc_state_);
250
251 impl.set_endpoints(
252 local_endpoint_, from_sockaddr(peer_storage));
253
254 *ec = {};
255 if (impl_out)
256 *impl_out = &impl;
257 }
258 else
259 {
260 ::close(accepted);
261 *ec = make_err(ENOENT);
262 if (impl_out)
263 *impl_out = nullptr;
264 }
265 return dispatch_coro(ex, h);
266 }
267
268 2 op.accepted_fd = accepted;
269 2 op.peer_storage = peer_storage;
270 2 op.complete(0, 0);
271 2 op.impl_ptr = shared_from_this();
272 2 svc_.post(&op);
273 2 return std::noop_coroutine();
274 }
275
276 4740 if (errno == EAGAIN || errno == EWOULDBLOCK)
277 {
278 4740 op.impl_ptr = shared_from_this();
279 4740 svc_.work_started();
280
281 4740 std::lock_guard lock(desc_state_.mutex);
282 4740 bool io_done = false;
283 4740 if (desc_state_.read_ready)
284 {
285 desc_state_.read_ready = false;
286 op.perform_io();
287 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
288 if (!io_done)
289 op.errn = 0;
290 }
291
292 4740 if (io_done || op.cancelled.load(std::memory_order_acquire))
293 {
294 svc_.post(&op);
295 svc_.work_finished();
296 }
297 else
298 {
299 4740 desc_state_.read_op = &op;
300 }
301 4740 return std::noop_coroutine();
302 4740 }
303
304 op.complete(errno, 0);
305 op.impl_ptr = shared_from_this();
306 svc_.post(&op);
307 // completion is always posted to scheduler queue, never inline.
308 return std::noop_coroutine();
309 }
310
311 inline void
312 2 epoll_acceptor::cancel() noexcept
313 {
314 2 cancel_single_op(acc_);
315 2 }
316
317 inline void
318 8 epoll_acceptor::cancel_single_op(epoll_op& op) noexcept
319 {
320 8 auto self = weak_from_this().lock();
321 8 if (!self)
322 return;
323
324 8 op.request_cancel();
325
326 8 epoll_op* claimed = nullptr;
327 {
328 8 std::lock_guard lock(desc_state_.mutex);
329 8 if (desc_state_.read_op == &op)
330 7 claimed = std::exchange(desc_state_.read_op, nullptr);
331 8 }
332 8 if (claimed)
333 {
334 7 op.impl_ptr = self;
335 7 svc_.post(&op);
336 7 svc_.work_finished();
337 }
338 8 }
339
340 inline void
341 326 epoll_acceptor::close_socket() noexcept
342 {
343 326 auto self = weak_from_this().lock();
344 326 if (self)
345 {
346 326 acc_.request_cancel();
347
348 326 epoll_op* claimed = nullptr;
349 {
350 326 std::lock_guard lock(desc_state_.mutex);
351 326 claimed = std::exchange(desc_state_.read_op, nullptr);
352 326 desc_state_.read_ready = false;
353 326 desc_state_.write_ready = false;
354 326 }
355
356 326 if (claimed)
357 {
358 2 acc_.impl_ptr = self;
359 2 svc_.post(&acc_);
360 2 svc_.work_finished();
361 }
362
363 326 if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
364 desc_state_.impl_ref_ = self;
365 }
366
367 326 if (fd_ >= 0)
368 {
369 81 if (desc_state_.registered_events != 0)
370 77 svc_.scheduler().deregister_descriptor(fd_);
371 81 ::close(fd_);
372 81 fd_ = -1;
373 }
374
375 326 desc_state_.fd = -1;
376 326 desc_state_.registered_events = 0;
377
378 326 local_endpoint_ = endpoint{};
379 326 }
380
381 239 inline epoll_acceptor_service::epoll_acceptor_service(
382 239 capy::execution_context& ctx)
383 239 : ctx_(ctx)
384 239 , state_(
385 std::make_unique<epoll_acceptor_state>(
386 239 ctx.use_service<epoll_scheduler>()))
387 {
388 239 }
389
390 478 inline epoll_acceptor_service::~epoll_acceptor_service() {}
391
392 inline void
393 239 epoll_acceptor_service::shutdown()
394 {
395 239 std::lock_guard lock(state_->mutex_);
396
397 239 while (auto* impl = state_->acceptor_list_.pop_front())
398 impl->close_socket();
399
400 // Don't clear acceptor_ptrs_ here — same rationale as
401 // epoll_socket_service::shutdown(). Let ~state_ release ptrs
402 // after scheduler shutdown has drained all queued ops.
403 239 }
404
405 inline io_object::implementation*
406 82 epoll_acceptor_service::construct()
407 {
408 82 auto impl = std::make_shared<epoll_acceptor>(*this);
409 82 auto* raw = impl.get();
410
411 82 std::lock_guard lock(state_->mutex_);
412 82 state_->acceptor_list_.push_back(raw);
413 82 state_->acceptor_ptrs_.emplace(raw, std::move(impl));
414
415 82 return raw;
416 82 }
417
418 inline void
419 82 epoll_acceptor_service::destroy(io_object::implementation* impl)
420 {
421 82 auto* epoll_impl = static_cast<epoll_acceptor*>(impl);
422 82 epoll_impl->close_socket();
423 82 std::lock_guard lock(state_->mutex_);
424 82 state_->acceptor_list_.remove(epoll_impl);
425 82 state_->acceptor_ptrs_.erase(epoll_impl);
426 82 }
427
428 inline void
429 163 epoll_acceptor_service::close(io_object::handle& h)
430 {
431 163 static_cast<epoll_acceptor*>(h.get())->close_socket();
432 163 }
433
434 inline std::error_code
435 79 epoll_acceptor::set_option(
436 int level, int optname, void const* data, std::size_t size) noexcept
437 {
438 79 if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) !=
439 0)
440 return make_err(errno);
441 79 return {};
442 }
443
444 inline std::error_code
445 epoll_acceptor::get_option(
446 int level, int optname, void* data, std::size_t* size) const noexcept
447 {
448 socklen_t len = static_cast<socklen_t>(*size);
449 if (::getsockopt(fd_, level, optname, data, &len) != 0)
450 return make_err(errno);
451 *size = static_cast<std::size_t>(len);
452 return {};
453 }
454
455 inline std::error_code
456 81 epoll_acceptor_service::open_acceptor_socket(
457 tcp_acceptor::implementation& impl, int family, int type, int protocol)
458 {
459 81 auto* epoll_impl = static_cast<epoll_acceptor*>(&impl);
460 81 epoll_impl->close_socket();
461
462 81 int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
463 81 if (fd < 0)
464 return make_err(errno);
465
466 81 if (family == AF_INET6)
467 {
468 8 int val = 0; // dual-stack default
469 8 ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
470 }
471
472 81 epoll_impl->fd_ = fd;
473
474 // Set up descriptor state but do NOT register with epoll yet
475 81 epoll_impl->desc_state_.fd = fd;
476 {
477 81 std::lock_guard lock(epoll_impl->desc_state_.mutex);
478 81 epoll_impl->desc_state_.read_op = nullptr;
479 81 }
480
481 81 return {};
482 }
483
484 inline std::error_code
485 80 epoll_acceptor_service::bind_acceptor(
486 tcp_acceptor::implementation& impl, endpoint ep)
487 {
488 80 auto* epoll_impl = static_cast<epoll_acceptor*>(&impl);
489 80 int fd = epoll_impl->fd_;
490
491 80 sockaddr_storage storage{};
492 80 socklen_t addrlen = detail::to_sockaddr(ep, storage);
493 80 if (::bind(fd, reinterpret_cast<sockaddr*>(&storage), addrlen) < 0)
494 3 return make_err(errno);
495
496 // Cache local endpoint (resolves ephemeral port)
497 77 sockaddr_storage local{};
498 77 socklen_t local_len = sizeof(local);
499 77 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local), &local_len) == 0)
500 77 epoll_impl->set_local_endpoint(detail::from_sockaddr(local));
501
502 77 return {};
503 }
504
505 inline std::error_code
506 77 epoll_acceptor_service::listen_acceptor(
507 tcp_acceptor::implementation& impl, int backlog)
508 {
509 77 auto* epoll_impl = static_cast<epoll_acceptor*>(&impl);
510 77 int fd = epoll_impl->fd_;
511
512 77 if (::listen(fd, backlog) < 0)
513 return make_err(errno);
514
515 // Register fd with epoll (edge-triggered mode)
516 77 scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
517
518 77 return {};
519 }
520
521 inline void
522 11 epoll_acceptor_service::post(epoll_op* op)
523 {
524 11 state_->sched_.post(op);
525 11 }
526
527 inline void
528 4740 epoll_acceptor_service::work_started() noexcept
529 {
530 4740 state_->sched_.work_started();
531 4740 }
532
533 inline void
534 9 epoll_acceptor_service::work_finished() noexcept
535 {
536 9 state_->sched_.work_finished();
537 9 }
538
539 inline epoll_socket_service*
540 4733 epoll_acceptor_service::socket_service() const noexcept
541 {
542 4733 auto* svc = ctx_.find_service<detail::socket_service>();
543 4733 return svc ? dynamic_cast<epoll_socket_service*>(svc) : nullptr;
544 }
545
546 } // namespace boost::corosio::detail
547
548 #endif // BOOST_COROSIO_HAS_EPOLL
549
550 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
551