5 #include <boost/asio/steady_timer.hpp> 6 #include <boost/fiber/mutex.hpp> 7 #include <boost/fiber/operations.hpp> 8 #include <boost/fiber/scheduler.hpp> 10 #include "base/logging.h" 11 #include "base/walltime.h" 13 #include <glog/raw_logging.h> 15 #include "base/walltime.h" 16 #include "util/asio/io_context.h" 20 using fibers_ext::BlockingCounter;
21 using namespace boost;
29 : io_props(*static_cast<IoFiberProperties*>(props)) {}
31 void set_resume_ts(uint64_t ts) { io_props.resume_ts_ = ts; }
32 void set_awaken_ts(uint64_t ts) { io_props.awaken_ts_ = ts; }
36 constexpr
unsigned MAIN_NICE_LEVEL = 0;
37 constexpr
unsigned DISPATCH_LEVEL = IoFiberProperties::NUM_NICE_LEVELS;
39 constexpr
unsigned NOTIFY_GUARD_SHIFT = 16;
40 constexpr chrono::steady_clock::time_point STEADY_PT_MAX = chrono::steady_clock::time_point::max();
42 inline int64_t delta_micros(
const chrono::steady_clock::time_point tp) {
43 static_assert(8 ==
sizeof(chrono::steady_clock::time_point),
"");
44 return chrono::duration_cast<chrono::microseconds>(tp - chrono::steady_clock::now()).count();
47 class AsioScheduler final :
public fibers::algo::algorithm_with_properties<IoFiberProperties> {
49 using ready_queue_type = fibers::scheduler::ready_queue_type;
50 std::shared_ptr<asio::io_context> io_context_;
51 std::unique_ptr<asio::steady_timer> suspend_timer_;
52 std::atomic_uint_fast64_t notify_cnt_{0};
53 uint64_t main_loop_wakes_{0}, worker_pick_start_ts_{0}, main_suspend_ts_;
55 std::atomic_uint_fast32_t notify_guard_{0};
56 uint32_t last_nice_level_ = 0;
57 ready_queue_type rqueue_arr_[IoFiberProperties::NUM_NICE_LEVELS + 1];
58 std::size_t ready_cnt_{0};
59 std::size_t switch_cnt_{0};
61 fibers::context* main_loop_ctx_ =
nullptr;
62 chrono::steady_clock::time_point suspend_tp_ = STEADY_PT_MAX;
64 enum : uint8_t { LOOP_RUN_ONE = 1, MAIN_LOOP_SUSPEND = 2, MAIN_LOOP_FINISHED = 4 };
69 AsioScheduler(
const std::shared_ptr<asio::io_context>& io_svc)
70 : io_context_(io_svc), suspend_timer_(new asio::steady_timer(*io_svc)) {}
74 void awakened(fibers::context* ctx, IoFiberProperties& props) noexcept
override;
76 fibers::context* pick_next() noexcept override;
78 void property_change(boost::fibers::context* ctx, IoFiberProperties& props) noexcept final {
87 if (!ctx->ready_is_linked()) {
93 if (!ctx->is_context(fibers::type::dispatcher_context)) {
94 DCHECK_GT(ready_cnt_, 0);
101 awakened(ctx, props);
104 bool has_ready_fibers() const noexcept final {
return 0 < ready_cnt_; }
108 void suspend_until(chrono::steady_clock::time_point
const& abs_time) noexcept
final {
109 DVLOG(2) <<
"suspend_until " << abs_time.time_since_epoch().count();
118 DCHECK(fibers::context::active()->is_context(fibers::type::dispatcher_context));
122 if (abs_time != STEADY_PT_MAX && suspend_tp_ != abs_time) {
140 suspend_tp_ = abs_time;
141 suspend_timer_->expires_at(abs_time);
143 suspend_timer_->async_wait([
this, abs_time](
const system::error_code& ec) {
145 SuspendCb(ec, abs_time);
147 VLOG(1) <<
"Arm suspender at micros from now " << delta_micros(abs_time)
148 <<
", abstime: " << abs_time.time_since_epoch().count();
150 CHECK_EQ(0, mask_ & LOOP_RUN_ONE) <<
"Deadlock detected";
153 main_loop_ctx_->get_scheduler()->schedule(main_loop_ctx_);
160 void notify() noexcept final;
165 void SuspendCb(const system::error_code& ec, chrono::steady_clock::time_point tp) {
166 VLOG(1) <<
"Fire suspender " << tp.time_since_epoch().count() <<
" " << ec;
169 suspend_tp_ = STEADY_PT_MAX;
174 CHECK_EQ(ec, asio::error::operation_aborted);
177 void WaitTillFibersSuspend();
180 AsioScheduler::~AsioScheduler() {}
182 void AsioScheduler::MainLoop() {
183 asio::io_context* io_cntx = io_context_.get();
184 main_loop_ctx_ = fibers::context::active();
186 while (!io_cntx->stopped()) {
187 if (has_ready_fibers()) {
188 while (io_cntx->poll())
191 auto start = base::GetMonotonicMicrosFast();
193 WaitTillFibersSuspend();
194 auto delta = base::GetMonotonicMicrosFast() - start;
195 LOG_IF(INFO, delta > 100000) <<
"Scheduler: Took " << delta / 1000 <<
" ms to resume";
202 DVLOG(2) <<
"MainLoop::RunOneStart";
203 mask_ |= LOOP_RUN_ONE;
204 if (!io_cntx->run_one()) {
205 mask_ &= ~LOOP_RUN_ONE;
208 DVLOG(2) <<
"MainLoop::RunOneEnd";
209 mask_ &= ~LOOP_RUN_ONE;
212 VLOG(1) <<
"MainLoop exited";
213 mask_ |= MAIN_LOOP_FINISHED;
224 constexpr uint32_t NOTIFY_BIT = 1U << NOTIFY_GUARD_SHIFT;
225 uint32_t seq = notify_guard_.fetch_or(NOTIFY_BIT);
228 seq = notify_guard_.load() & (NOTIFY_BIT - 1);
230 suspend_timer_.reset();
232 VLOG(1) <<
"MainLoopWakes/NotifyCnt: " << main_loop_wakes_ <<
"/" << notify_cnt_;
235 void AsioScheduler::WaitTillFibersSuspend() {
238 mask_ |= MAIN_LOOP_SUSPEND;
240 DVLOG(2) <<
"WaitTillFibersSuspend:Start";
241 main_suspend_ts_ = base::GetMonotonicMicrosFast();
243 main_loop_ctx_->suspend();
244 mask_ &= ~MAIN_LOOP_SUSPEND;
246 DVLOG(2) <<
"WaitTillFibersSuspend:End";
250 void AsioScheduler::awakened(fibers::context* ctx, IoFiberProperties& props) noexcept {
251 DCHECK(!ctx->ready_is_linked());
253 ready_queue_type* rq;
254 if (ctx->is_context(fibers::type::dispatcher_context)) {
255 rq = rqueue_arr_ + DISPATCH_LEVEL;
256 DVLOG(2) <<
"Ready: " << fibers_ext::short_id(ctx) <<
" dispatch" 257 <<
", ready_cnt: " << ready_cnt_;
259 unsigned nice = props.nice_level();
260 DCHECK_LT(nice, IoFiberProperties::NUM_NICE_LEVELS);
261 rq = rqueue_arr_ + nice;
263 if (last_nice_level_ > nice)
264 last_nice_level_ = nice;
266 uint64_t now = base::GetMonotonicMicrosFast();
267 IoFiberPropertiesMgr{&props}.set_awaken_ts(now);
276 if (nice > MAIN_NICE_LEVEL && (mask_ & MAIN_LOOP_SUSPEND) && switch_cnt_ > 0 &&
277 !main_loop_ctx_->ready_is_linked()) {
278 if (now - main_suspend_ts_ > 5000) {
279 DVLOG(2) <<
"Wake MAIN_LOOP_SUSPEND " << fibers_ext::short_id(main_loop_ctx_)
280 <<
", r/s: " << ready_cnt_ <<
"/" << switch_cnt_;
284 main_loop_ctx_->ready_link(rqueue_arr_[MAIN_NICE_LEVEL]);
285 last_nice_level_ = MAIN_NICE_LEVEL;
290 DVLOG(2) <<
"Ready: " << fibers_ext::short_id(ctx) <<
"/" << props.name()
291 <<
", nice/rdc: " << nice <<
"/" << ready_cnt_;
294 ctx->ready_link(*rq);
297 fibers::context* AsioScheduler::pick_next() noexcept {
298 fibers::context* ctx(
nullptr);
299 using fibers_ext::short_id;
301 auto now = base::GetMonotonicMicrosFast();
302 auto delta = now - worker_pick_start_ts_;
303 worker_pick_start_ts_ = now;
305 for (; last_nice_level_ < IoFiberProperties::NUM_NICE_LEVELS; ++last_nice_level_) {
306 auto& q = rqueue_arr_[last_nice_level_];
314 DCHECK(!ctx->is_context(fibers::type::dispatcher_context));
315 DCHECK_GT(ready_cnt_, 0);
318 RAW_VLOG(2,
"Switching from %x to %x switch_cnt(%d)", short_id(), short_id(ctx), switch_cnt_);
319 DCHECK(ctx != fibers::context::active());
321 IoFiberPropertiesMgr{ctx->get_properties()}.set_resume_ts(now);
326 if ((mask_ & MAIN_LOOP_SUSPEND) && last_nice_level_ > MAIN_NICE_LEVEL) {
330 auto* active = fibers::context::active();
331 if (!active->is_context(fibers::type::main_context)) {
332 auto& props = static_cast<IoFiberProperties&>(*active->get_properties());
334 LOG(INFO) << props.name() <<
" took " << delta / 1000 <<
" ms";
346 RAW_VLOG(3,
"pick_next: %x", short_id(ctx));
351 DCHECK_EQ(0, ready_cnt_);
353 auto& dispatch_q = rqueue_arr_[DISPATCH_LEVEL];
354 if (!dispatch_q.empty()) {
355 fibers::context* ctx = &dispatch_q.front();
356 dispatch_q.pop_front();
358 RAW_VLOG(2,
"Switching from ", short_id(),
" to dispatch ", short_id(ctx),
359 ", mask: ",
unsigned(mask_));
363 RAW_VLOG(2,
"pick_next: null");
368 void AsioScheduler::notify() noexcept {
369 uint32_t seq = notify_guard_.fetch_add(1, std::memory_order_acq_rel);
371 if ((seq >> 16) == 0) {
379 RAW_VLOG(1,
"AsioScheduler::notify");
389 asio::post(*io_context_, [] { this_fiber::yield(); });
390 notify_cnt_.fetch_add(1, std::memory_order_relaxed);
392 RAW_VLOG(1,
"Called during shutdown phase");
394 notify_guard_.fetch_sub(1, std::memory_order_acq_rel);
399 constexpr
unsigned IoFiberProperties::MAX_NICE_LEVEL;
400 constexpr
unsigned IoFiberProperties::NUM_NICE_LEVELS;
402 void IoFiberProperties::SetNiceLevel(
unsigned p) {
405 p = std::min(p, MAX_NICE_LEVEL);
412 void IoContext::StartLoop(BlockingCounter* bc) {
415 AsioScheduler* scheduler =
new AsioScheduler(context_ptr_);
416 fibers::context::active()->get_scheduler()->set_algo(scheduler);
417 this_fiber::properties<IoFiberProperties>().set_name(
"io_loop");
418 this_fiber::properties<IoFiberProperties>().SetNiceLevel(MAIN_NICE_LEVEL);
419 CHECK(fibers::context::active()->is_context(fibers::type::main_context));
421 thread_id_ = this_thread::get_id();
423 io_context& io_cntx = *context_ptr_;
429 Async([scheduler, bc] {
431 scheduler->MainLoop();
439 for (
unsigned i = 0; i < 2; ++i) {
440 DVLOG(1) <<
"Cleanup Loop " << i;
441 while (io_cntx.poll() || scheduler->has_ready_fibers()) {
448 void IoContext::Stop() {
449 if (cancellable_arr_.size() > 0) {
450 fibers_ext::BlockingCounter cancel_bc(cancellable_arr_.size());
452 VLOG(1) <<
"Cancelling " << cancellable_arr_.size() <<
" cancellables";
454 for (
auto& k_v : cancellable_arr_) {
461 for (
auto& k_v : cancellable_arr_) {
464 cancellable_arr_.clear();
467 context_ptr_->stop();
468 VLOG(1) <<
"AsioIoContext stopped";