TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2026 Steve Gerbino
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/cppalliance/corosio
9 : //
10 :
11 : #ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
12 : #define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
13 :
14 : #include <boost/corosio/timer.hpp>
15 : #include <boost/corosio/io_context.hpp>
16 : #include <boost/corosio/detail/scheduler_op.hpp>
17 : #include <boost/corosio/native/native_scheduler.hpp>
18 : #include <boost/corosio/detail/intrusive.hpp>
19 : #include <boost/corosio/detail/thread_local_ptr.hpp>
20 : #include <boost/capy/error.hpp>
21 : #include <boost/capy/ex/execution_context.hpp>
22 : #include <boost/capy/ex/executor_ref.hpp>
23 : #include <system_error>
24 :
25 : #include <atomic>
26 : #include <chrono>
27 : #include <coroutine>
28 : #include <cstddef>
29 : #include <limits>
30 : #include <mutex>
31 : #include <optional>
32 : #include <stop_token>
33 : #include <utility>
34 : #include <vector>
35 :
36 : namespace boost::corosio::detail {
37 :
38 : struct scheduler;
39 :
40 : /*
41 : Timer Service
42 : =============
43 :
44 : Data Structures
45 : ---------------
46 : waiter_node holds per-waiter state: coroutine handle, executor,
47 : error output, stop_token, embedded completion_op. Each concurrent
48 : co_await t.wait() allocates one waiter_node.
49 :
50 : timer_service::implementation holds per-timer state: expiry,
51 : heap index, and an intrusive_list of waiter_nodes. Multiple
52 : coroutines can wait on the same timer simultaneously.
53 :
54 : timer_service owns a min-heap of active timers, a free list
55 : of recycled impls, and a free list of recycled waiter_nodes. The
56 : heap is ordered by expiry time; the scheduler queries
57 : nearest_expiry() to set the epoll/timerfd timeout.
58 :
59 : Optimization Strategy
60 : ---------------------
61 : 1. Deferred heap insertion — expires_after() stores the expiry
62 : but does not insert into the heap. Insertion happens in wait().
63 : 2. Thread-local impl cache — single-slot per-thread cache.
64 : 3. Embedded completion_op — eliminates heap allocation per fire/cancel.
65 : 4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
66 : 5. might_have_pending_waits_ flag — skips lock when no wait issued.
67 : 6. Thread-local waiter cache — single-slot per-thread cache.
68 :
69 : Concurrency
70 : -----------
71 : stop_token callbacks can fire from any thread. The impl_
72 : pointer on waiter_node is used as a "still in list" marker.
73 : */
74 :
75 : struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
76 :
77 : inline void timer_service_invalidate_cache() noexcept;
78 :
79 : // timer_service class body — member function definitions are
80 : // out-of-class (after implementation and waiter_node are complete)
81 : class BOOST_COROSIO_DECL timer_service final
82 : : public capy::execution_context::service
83 : , public io_object::io_service
84 : {
85 : public:
86 : using clock_type = std::chrono::steady_clock;
87 : using time_point = clock_type::time_point;
88 :
89 : class callback
90 : {
91 : void* ctx_ = nullptr;
92 : void (*fn_)(void*) = nullptr;
93 :
94 : public:
95 HIT 407 : callback() = default;
96 407 : callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
97 :
98 : explicit operator bool() const noexcept
99 : {
100 : return fn_ != nullptr;
101 : }
102 8679 : void operator()() const
103 : {
104 8679 : if (fn_)
105 8679 : fn_(ctx_);
106 8679 : }
107 : };
108 :
109 : struct implementation;
110 :
111 : private:
112 : struct heap_entry
113 : {
114 : time_point time_;
115 : implementation* timer_;
116 : };
117 :
118 : scheduler* sched_ = nullptr;
119 : mutable std::mutex mutex_;
120 : std::vector<heap_entry> heap_;
121 : implementation* free_list_ = nullptr;
122 : waiter_node* waiter_free_list_ = nullptr;
123 : callback on_earliest_changed_;
124 : // Avoids mutex in nearest_expiry() and empty()
125 : mutable std::atomic<std::int64_t> cached_nearest_ns_{
126 : (std::numeric_limits<std::int64_t>::max)()};
127 :
128 : public:
129 407 : inline timer_service(capy::execution_context&, scheduler& sched)
130 407 : : sched_(&sched)
131 : {
132 407 : }
133 :
134 17442 : inline scheduler& get_scheduler() noexcept
135 : {
136 17442 : return *sched_;
137 : }
138 :
139 814 : ~timer_service() override = default;
140 :
141 : timer_service(timer_service const&) = delete;
142 : timer_service& operator=(timer_service const&) = delete;
143 :
144 407 : inline void set_on_earliest_changed(callback cb)
145 : {
146 407 : on_earliest_changed_ = cb;
147 407 : }
148 :
149 : inline bool empty() const noexcept
150 : {
151 : return cached_nearest_ns_.load(std::memory_order_acquire) ==
152 : (std::numeric_limits<std::int64_t>::max)();
153 : }
154 :
155 20568 : inline time_point nearest_expiry() const noexcept
156 : {
157 20568 : auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
158 20568 : return time_point(time_point::duration(ns));
159 : }
160 :
161 : inline void shutdown() override;
162 : inline io_object::implementation* construct() override;
163 : inline void destroy(io_object::implementation* p) override;
164 : inline void destroy_impl(implementation& impl);
165 : inline waiter_node* create_waiter();
166 : inline void destroy_waiter(waiter_node* w);
167 : inline std::size_t update_timer(implementation& impl, time_point new_time);
168 : inline void insert_waiter(implementation& impl, waiter_node* w);
169 : inline std::size_t cancel_timer(implementation& impl);
170 : inline void cancel_waiter(waiter_node* w);
171 : inline std::size_t cancel_one_waiter(implementation& impl);
172 : inline std::size_t process_expired();
173 :
174 : private:
175 111342 : inline void refresh_cached_nearest() noexcept
176 : {
177 111342 : auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
178 110865 : : heap_[0].time_.time_since_epoch().count();
179 111342 : cached_nearest_ns_.store(ns, std::memory_order_release);
180 111342 : }
181 :
182 : inline void remove_timer_impl(implementation& impl);
183 : inline void up_heap(std::size_t index);
184 : inline void down_heap(std::size_t index);
185 : inline void swap_heap(std::size_t i1, std::size_t i2);
186 : };
187 :
188 : struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
189 : : intrusive_list<waiter_node>::node
190 : {
191 : // Embedded completion op — avoids heap allocation per fire/cancel
192 : struct completion_op final : scheduler_op
193 : {
194 : waiter_node* waiter_ = nullptr;
195 :
196 : static void do_complete(
197 : void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
198 :
199 194 : completion_op() noexcept : scheduler_op(&do_complete) {}
200 :
201 : void operator()() override;
202 : void destroy() override;
203 : };
204 :
205 : // Per-waiter stop_token cancellation
206 : struct canceller
207 : {
208 : waiter_node* waiter_;
209 : void operator()() const;
210 : };
211 :
212 : // nullptr once removed from timer's waiter list (concurrency marker)
213 : timer_service::implementation* impl_ = nullptr;
214 : timer_service* svc_ = nullptr;
215 : std::coroutine_handle<> h_;
216 : capy::executor_ref d_;
217 : std::error_code* ec_out_ = nullptr;
218 : std::stop_token token_;
219 : std::optional<std::stop_callback<canceller>> stop_cb_;
220 : completion_op op_;
221 : std::error_code ec_value_;
222 : waiter_node* next_free_ = nullptr;
223 :
224 194 : waiter_node() noexcept
225 194 : {
226 194 : op_.waiter_ = this;
227 194 : }
228 : };
229 :
230 : struct timer_service::implementation final : timer::implementation
231 : {
232 : using clock_type = std::chrono::steady_clock;
233 : using time_point = clock_type::time_point;
234 : using duration = clock_type::duration;
235 :
236 : timer_service* svc_ = nullptr;
237 : intrusive_list<waiter_node> waiters_;
238 :
239 : // Free list linkage (reused when impl is on free_list)
240 : implementation* next_free_ = nullptr;
241 :
242 : inline explicit implementation(timer_service& svc) noexcept;
243 :
244 : inline std::coroutine_handle<> wait(
245 : std::coroutine_handle<>,
246 : capy::executor_ref,
247 : std::stop_token,
248 : std::error_code*) override;
249 : };
250 :
251 : // Thread-local caches avoid hot-path mutex acquisitions:
252 : // 1. Impl cache — single-slot, validated by comparing svc_
253 : // 2. Waiter cache — single-slot, no service affinity
254 : // All caches are cleared by timer_service_invalidate_cache() during shutdown.
255 :
256 : inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
257 : inline thread_local_ptr<waiter_node> tl_cached_waiter;
258 :
259 : inline timer_service::implementation*
260 9000 : try_pop_tl_cache(timer_service* svc) noexcept
261 : {
262 9000 : auto* impl = tl_cached_impl.get();
263 9000 : if (impl)
264 : {
265 8773 : tl_cached_impl.set(nullptr);
266 8773 : if (impl->svc_ == svc)
267 8773 : return impl;
268 : // Stale impl from a destroyed service
269 MIS 0 : delete impl;
270 : }
271 HIT 227 : return nullptr;
272 : }
273 :
274 : inline bool
275 8998 : try_push_tl_cache(timer_service::implementation* impl) noexcept
276 : {
277 8998 : if (!tl_cached_impl.get())
278 : {
279 8924 : tl_cached_impl.set(impl);
280 8924 : return true;
281 : }
282 74 : return false;
283 : }
284 :
285 : inline waiter_node*
286 8722 : try_pop_waiter_tl_cache() noexcept
287 : {
288 8722 : auto* w = tl_cached_waiter.get();
289 8722 : if (w)
290 : {
291 8526 : tl_cached_waiter.set(nullptr);
292 8526 : return w;
293 : }
294 196 : return nullptr;
295 : }
296 :
297 : inline bool
298 8712 : try_push_waiter_tl_cache(waiter_node* w) noexcept
299 : {
300 8712 : if (!tl_cached_waiter.get())
301 : {
302 8632 : tl_cached_waiter.set(w);
303 8632 : return true;
304 : }
305 80 : return false;
306 : }
307 :
308 : inline void
309 407 : timer_service_invalidate_cache() noexcept
310 : {
311 407 : delete tl_cached_impl.get();
312 407 : tl_cached_impl.set(nullptr);
313 :
314 407 : delete tl_cached_waiter.get();
315 407 : tl_cached_waiter.set(nullptr);
316 407 : }
317 :
318 : // timer_service out-of-class member function definitions
319 :
320 227 : inline timer_service::implementation::implementation(
321 227 : timer_service& svc) noexcept
322 227 : : svc_(&svc)
323 : {
324 227 : }
325 :
326 : inline void
327 407 : timer_service::shutdown()
328 : {
329 407 : timer_service_invalidate_cache();
330 :
331 : // Cancel waiting timers still in the heap.
332 : // Each waiter called work_started() in implementation::wait().
333 : // On IOCP the scheduler shutdown loop exits when outstanding_work_
334 : // reaches zero, so we must call work_finished() here to balance it.
335 : // On other backends this is harmless (their drain loops exit when
336 : // the queue is empty, not based on outstanding_work_).
337 409 : for (auto& entry : heap_)
338 : {
339 2 : auto* impl = entry.timer_;
340 4 : while (auto* w = impl->waiters_.pop_front())
341 : {
342 2 : w->stop_cb_.reset();
343 2 : auto h = std::exchange(w->h_, {});
344 2 : sched_->work_finished();
345 2 : if (h)
346 2 : h.destroy();
347 2 : delete w;
348 2 : }
349 2 : impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
350 2 : delete impl;
351 : }
352 407 : heap_.clear();
353 407 : cached_nearest_ns_.store(
354 : (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
355 :
356 : // Delete free-listed impls
357 481 : while (free_list_)
358 : {
359 74 : auto* next = free_list_->next_free_;
360 74 : delete free_list_;
361 74 : free_list_ = next;
362 : }
363 :
364 : // Delete free-listed waiters
365 485 : while (waiter_free_list_)
366 : {
367 78 : auto* next = waiter_free_list_->next_free_;
368 78 : delete waiter_free_list_;
369 78 : waiter_free_list_ = next;
370 : }
371 407 : }
372 :
373 : inline io_object::implementation*
374 9000 : timer_service::construct()
375 : {
376 9000 : implementation* impl = try_pop_tl_cache(this);
377 9000 : if (impl)
378 : {
379 8773 : impl->svc_ = this;
380 8773 : impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
381 8773 : impl->might_have_pending_waits_ = false;
382 8773 : return impl;
383 : }
384 :
385 227 : std::lock_guard lock(mutex_);
386 227 : if (free_list_)
387 : {
388 MIS 0 : impl = free_list_;
389 0 : free_list_ = impl->next_free_;
390 0 : impl->next_free_ = nullptr;
391 0 : impl->svc_ = this;
392 0 : impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
393 0 : impl->might_have_pending_waits_ = false;
394 : }
395 : else
396 : {
397 HIT 227 : impl = new implementation(*this);
398 : }
399 227 : return impl;
400 227 : }
401 :
402 : inline void
403 8998 : timer_service::destroy(io_object::implementation* p)
404 : {
405 8998 : destroy_impl(static_cast<implementation&>(*p));
406 8998 : }
407 :
408 : inline void
409 8998 : timer_service::destroy_impl(implementation& impl)
410 : {
411 8998 : cancel_timer(impl);
412 :
413 8998 : if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
414 : {
415 MIS 0 : std::lock_guard lock(mutex_);
416 0 : remove_timer_impl(impl);
417 0 : refresh_cached_nearest();
418 0 : }
419 :
420 HIT 8998 : if (try_push_tl_cache(&impl))
421 8924 : return;
422 :
423 74 : std::lock_guard lock(mutex_);
424 74 : impl.next_free_ = free_list_;
425 74 : free_list_ = &impl;
426 74 : }
427 :
428 : inline waiter_node*
429 8722 : timer_service::create_waiter()
430 : {
431 8722 : if (auto* w = try_pop_waiter_tl_cache())
432 8526 : return w;
433 :
434 196 : std::lock_guard lock(mutex_);
435 196 : if (waiter_free_list_)
436 : {
437 2 : auto* w = waiter_free_list_;
438 2 : waiter_free_list_ = w->next_free_;
439 2 : w->next_free_ = nullptr;
440 2 : return w;
441 : }
442 :
443 194 : return new waiter_node();
444 196 : }
445 :
446 : inline void
447 8712 : timer_service::destroy_waiter(waiter_node* w)
448 : {
449 8712 : if (try_push_waiter_tl_cache(w))
450 8632 : return;
451 :
452 80 : std::lock_guard lock(mutex_);
453 80 : w->next_free_ = waiter_free_list_;
454 80 : waiter_free_list_ = w;
455 80 : }
456 :
457 : inline std::size_t
458 6 : timer_service::update_timer(implementation& impl, time_point new_time)
459 : {
460 : bool in_heap =
461 6 : (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
462 6 : if (!in_heap && impl.waiters_.empty())
463 MIS 0 : return 0;
464 :
465 HIT 6 : bool notify = false;
466 6 : intrusive_list<waiter_node> canceled;
467 :
468 : {
469 6 : std::lock_guard lock(mutex_);
470 :
471 16 : while (auto* w = impl.waiters_.pop_front())
472 : {
473 10 : w->impl_ = nullptr;
474 10 : canceled.push_back(w);
475 10 : }
476 :
477 6 : if (impl.heap_index_ < heap_.size())
478 : {
479 6 : time_point old_time = heap_[impl.heap_index_].time_;
480 6 : heap_[impl.heap_index_].time_ = new_time;
481 :
482 6 : if (new_time < old_time)
483 6 : up_heap(impl.heap_index_);
484 : else
485 MIS 0 : down_heap(impl.heap_index_);
486 :
487 HIT 6 : notify = (impl.heap_index_ == 0);
488 : }
489 :
490 6 : refresh_cached_nearest();
491 6 : }
492 :
493 6 : std::size_t count = 0;
494 16 : while (auto* w = canceled.pop_front())
495 : {
496 10 : w->ec_value_ = make_error_code(capy::error::canceled);
497 10 : sched_->post(&w->op_);
498 10 : ++count;
499 10 : }
500 :
501 6 : if (notify)
502 6 : on_earliest_changed_();
503 :
504 6 : return count;
505 : }
506 :
507 : inline void
508 8722 : timer_service::insert_waiter(implementation& impl, waiter_node* w)
509 : {
510 8722 : bool notify = false;
511 : {
512 8722 : std::lock_guard lock(mutex_);
513 8722 : if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
514 : {
515 8700 : impl.heap_index_ = heap_.size();
516 8700 : heap_.push_back({impl.expiry_, &impl});
517 8700 : up_heap(heap_.size() - 1);
518 8700 : notify = (impl.heap_index_ == 0);
519 8700 : refresh_cached_nearest();
520 : }
521 8722 : impl.waiters_.push_back(w);
522 8722 : }
523 8722 : if (notify)
524 8673 : on_earliest_changed_();
525 8722 : }
526 :
527 : inline std::size_t
528 9006 : timer_service::cancel_timer(implementation& impl)
529 : {
530 9006 : if (!impl.might_have_pending_waits_)
531 8982 : return 0;
532 :
533 : // Not in heap and no waiters — just clear the flag
534 24 : if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
535 MIS 0 : impl.waiters_.empty())
536 : {
537 0 : impl.might_have_pending_waits_ = false;
538 0 : return 0;
539 : }
540 :
541 HIT 24 : intrusive_list<waiter_node> canceled;
542 :
543 : {
544 24 : std::lock_guard lock(mutex_);
545 24 : remove_timer_impl(impl);
546 52 : while (auto* w = impl.waiters_.pop_front())
547 : {
548 28 : w->impl_ = nullptr;
549 28 : canceled.push_back(w);
550 28 : }
551 24 : refresh_cached_nearest();
552 24 : }
553 :
554 24 : impl.might_have_pending_waits_ = false;
555 :
556 24 : std::size_t count = 0;
557 52 : while (auto* w = canceled.pop_front())
558 : {
559 28 : w->ec_value_ = make_error_code(capy::error::canceled);
560 28 : sched_->post(&w->op_);
561 28 : ++count;
562 28 : }
563 :
564 24 : return count;
565 : }
566 :
567 : inline void
568 30 : timer_service::cancel_waiter(waiter_node* w)
569 : {
570 : {
571 30 : std::lock_guard lock(mutex_);
572 : // Already removed by cancel_timer or process_expired
573 30 : if (!w->impl_)
574 MIS 0 : return;
575 HIT 30 : auto* impl = w->impl_;
576 30 : w->impl_ = nullptr;
577 30 : impl->waiters_.remove(w);
578 30 : if (impl->waiters_.empty())
579 : {
580 28 : remove_timer_impl(*impl);
581 28 : impl->might_have_pending_waits_ = false;
582 : }
583 30 : refresh_cached_nearest();
584 30 : }
585 :
586 30 : w->ec_value_ = make_error_code(capy::error::canceled);
587 30 : sched_->post(&w->op_);
588 : }
589 :
590 : inline std::size_t
591 2 : timer_service::cancel_one_waiter(implementation& impl)
592 : {
593 2 : if (!impl.might_have_pending_waits_)
594 MIS 0 : return 0;
595 :
596 HIT 2 : waiter_node* w = nullptr;
597 :
598 : {
599 2 : std::lock_guard lock(mutex_);
600 2 : w = impl.waiters_.pop_front();
601 2 : if (!w)
602 MIS 0 : return 0;
603 HIT 2 : w->impl_ = nullptr;
604 2 : if (impl.waiters_.empty())
605 : {
606 MIS 0 : remove_timer_impl(impl);
607 0 : impl.might_have_pending_waits_ = false;
608 : }
609 HIT 2 : refresh_cached_nearest();
610 2 : }
611 :
612 2 : w->ec_value_ = make_error_code(capy::error::canceled);
613 2 : sched_->post(&w->op_);
614 2 : return 1;
615 : }
616 :
617 : inline std::size_t
618 102580 : timer_service::process_expired()
619 : {
620 102580 : intrusive_list<waiter_node> expired;
621 :
622 : {
623 102580 : std::lock_guard lock(mutex_);
624 102580 : auto now = clock_type::now();
625 :
626 111226 : while (!heap_.empty() && heap_[0].time_ <= now)
627 : {
628 8646 : implementation* t = heap_[0].timer_;
629 8646 : remove_timer_impl(*t);
630 17296 : while (auto* w = t->waiters_.pop_front())
631 : {
632 8650 : w->impl_ = nullptr;
633 8650 : w->ec_value_ = {};
634 8650 : expired.push_back(w);
635 8650 : }
636 8646 : t->might_have_pending_waits_ = false;
637 : }
638 :
639 102580 : refresh_cached_nearest();
640 102580 : }
641 :
642 102580 : std::size_t count = 0;
643 111230 : while (auto* w = expired.pop_front())
644 : {
645 8650 : sched_->post(&w->op_);
646 8650 : ++count;
647 8650 : }
648 :
649 102580 : return count;
650 : }
651 :
652 : inline void
653 8698 : timer_service::remove_timer_impl(implementation& impl)
654 : {
655 8698 : std::size_t index = impl.heap_index_;
656 8698 : if (index >= heap_.size())
657 MIS 0 : return; // Not in heap
658 :
659 HIT 8698 : if (index == heap_.size() - 1)
660 : {
661 : // Last element, just pop
662 136 : impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
663 136 : heap_.pop_back();
664 : }
665 : else
666 : {
667 : // Swap with last and reheapify
668 8562 : swap_heap(index, heap_.size() - 1);
669 8562 : impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
670 8562 : heap_.pop_back();
671 :
672 8562 : if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
673 MIS 0 : up_heap(index);
674 : else
675 HIT 8562 : down_heap(index);
676 : }
677 : }
678 :
679 : inline void
680 8706 : timer_service::up_heap(std::size_t index)
681 : {
682 17247 : while (index > 0)
683 : {
684 8568 : std::size_t parent = (index - 1) / 2;
685 8568 : if (!(heap_[index].time_ < heap_[parent].time_))
686 27 : break;
687 8541 : swap_heap(index, parent);
688 8541 : index = parent;
689 : }
690 8706 : }
691 :
692 : inline void
693 8562 : timer_service::down_heap(std::size_t index)
694 : {
695 8562 : std::size_t child = index * 2 + 1;
696 8562 : while (child < heap_.size())
697 : {
698 6 : std::size_t min_child = (child + 1 == heap_.size() ||
699 MIS 0 : heap_[child].time_ < heap_[child + 1].time_)
700 HIT 6 : ? child
701 6 : : child + 1;
702 :
703 6 : if (heap_[index].time_ < heap_[min_child].time_)
704 6 : break;
705 :
706 MIS 0 : swap_heap(index, min_child);
707 0 : index = min_child;
708 0 : child = index * 2 + 1;
709 : }
710 HIT 8562 : }
711 :
712 : inline void
713 17103 : timer_service::swap_heap(std::size_t i1, std::size_t i2)
714 : {
715 17103 : heap_entry tmp = heap_[i1];
716 17103 : heap_[i1] = heap_[i2];
717 17103 : heap_[i2] = tmp;
718 17103 : heap_[i1].timer_->heap_index_ = i1;
719 17103 : heap_[i2].timer_->heap_index_ = i2;
720 17103 : }
721 :
722 : // waiter_node out-of-class member function definitions
723 :
724 : inline void
725 30 : waiter_node::canceller::operator()() const
726 : {
727 30 : waiter_->svc_->cancel_waiter(waiter_);
728 30 : }
729 :
730 : inline void
731 MIS 0 : waiter_node::completion_op::do_complete(
732 : [[maybe_unused]] void* owner,
733 : scheduler_op* base,
734 : std::uint32_t,
735 : std::uint32_t)
736 : {
737 : // owner is always non-null here. The destroy path (owner == nullptr)
738 : // is unreachable because completion_op overrides destroy() directly,
739 : // bypassing scheduler_op::destroy() which would call func_(nullptr, ...).
740 0 : BOOST_COROSIO_ASSERT(owner);
741 0 : static_cast<completion_op*>(base)->operator()();
742 0 : }
743 :
744 : inline void
745 HIT 8712 : waiter_node::completion_op::operator()()
746 : {
747 8712 : auto* w = waiter_;
748 8712 : w->stop_cb_.reset();
749 8712 : if (w->ec_out_)
750 8712 : *w->ec_out_ = w->ec_value_;
751 :
752 8712 : auto h = w->h_;
753 8712 : auto d = w->d_;
754 8712 : auto* svc = w->svc_;
755 8712 : auto& sched = svc->get_scheduler();
756 :
757 8712 : svc->destroy_waiter(w);
758 :
759 8712 : d.post(h);
760 8712 : sched.work_finished();
761 8712 : }
762 :
763 : inline void
764 8 : waiter_node::completion_op::destroy()
765 : {
766 : // Called during scheduler shutdown drain when this completion_op is
767 : // in the scheduler's ready queue (posted by cancel_timer() or
768 : // process_expired()). Balances the work_started() from
769 : // implementation::wait(). The scheduler drain loop separately
770 : // balances the work_started() from post(). On IOCP both decrements
771 : // are required for outstanding_work_ to reach zero; on other
772 : // backends this is harmless.
773 : //
774 : // This override also prevents scheduler_op::destroy() from calling
775 : // do_complete(nullptr, ...). See also: timer_service::shutdown()
776 : // which drains waiters still in the timer heap (the other path).
777 8 : auto* w = waiter_;
778 8 : w->stop_cb_.reset();
779 8 : auto h = std::exchange(w->h_, {});
780 8 : auto& sched = w->svc_->get_scheduler();
781 8 : delete w;
782 8 : sched.work_finished();
783 8 : if (h)
784 8 : h.destroy();
785 8 : }
786 :
787 : inline std::coroutine_handle<>
788 8722 : timer_service::implementation::wait(
789 : std::coroutine_handle<> h,
790 : capy::executor_ref d,
791 : std::stop_token token,
792 : std::error_code* ec)
793 : {
794 : // Already-expired fast path — no waiter_node, no mutex.
795 : // Post instead of dispatch so the coroutine yields to the
796 : // scheduler, allowing other queued work to run.
797 8722 : if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
798 : {
799 8700 : if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
800 : {
801 MIS 0 : if (ec)
802 0 : *ec = {};
803 0 : d.post(h);
804 0 : return std::noop_coroutine();
805 : }
806 : }
807 :
808 HIT 8722 : auto* w = svc_->create_waiter();
809 8722 : w->impl_ = this;
810 8722 : w->svc_ = svc_;
811 8722 : w->h_ = h;
812 8722 : w->d_ = d;
813 8722 : w->token_ = std::move(token);
814 8722 : w->ec_out_ = ec;
815 :
816 8722 : svc_->insert_waiter(*this, w);
817 8722 : might_have_pending_waits_ = true;
818 8722 : svc_->get_scheduler().work_started();
819 :
820 8722 : if (w->token_.stop_possible())
821 48 : w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
822 :
823 8722 : return std::noop_coroutine();
824 : }
825 :
826 : // Free functions
827 :
828 : struct timer_service_access
829 : {
830 9000 : static native_scheduler& get_scheduler(io_context& ctx) noexcept
831 : {
832 9000 : return static_cast<native_scheduler&>(*ctx.sched_);
833 : }
834 : };
835 :
836 : // Bypass find_service() mutex by reading the scheduler's cached pointer
837 : inline io_object::io_service&
838 9000 : timer_service_direct(capy::execution_context& ctx) noexcept
839 : {
840 9000 : return *timer_service_access::get_scheduler(static_cast<io_context&>(ctx))
841 9000 : .timer_svc_;
842 : }
843 :
844 : inline std::size_t
845 6 : timer_service_update_expiry(timer::implementation& base)
846 : {
847 6 : auto& impl = static_cast<timer_service::implementation&>(base);
848 6 : return impl.svc_->update_timer(impl, impl.expiry_);
849 : }
850 :
851 : inline std::size_t
852 8 : timer_service_cancel(timer::implementation& base) noexcept
853 : {
854 8 : auto& impl = static_cast<timer_service::implementation&>(base);
855 8 : return impl.svc_->cancel_timer(impl);
856 : }
857 :
858 : inline std::size_t
859 2 : timer_service_cancel_one(timer::implementation& base) noexcept
860 : {
861 2 : auto& impl = static_cast<timer_service::implementation&>(base);
862 2 : return impl.svc_->cancel_one_waiter(impl);
863 : }
864 :
865 : inline timer_service&
866 407 : get_timer_service(capy::execution_context& ctx, scheduler& sched)
867 : {
868 407 : return ctx.make_service<timer_service>(sched);
869 : }
870 :
871 : } // namespace boost::corosio::detail
872 :
873 : #endif
|