io_context.cc
1 // Copyright 2018, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 
5 #include <boost/asio/steady_timer.hpp>
6 #include <boost/fiber/mutex.hpp>
7 #include <boost/fiber/operations.hpp>
8 #include <boost/fiber/scheduler.hpp>
9 
10 #include "base/logging.h"
11 #include "base/walltime.h"
12 
13 #include <glog/raw_logging.h>
14 
15 #include "base/walltime.h"
16 #include "util/asio/io_context.h"
17 
18 namespace util {
19 
20 using fibers_ext::BlockingCounter;
21 using namespace boost;
22 using namespace std;
23 
25  IoFiberProperties& io_props;
26 
27  public:
28  IoFiberPropertiesMgr(fibers::fiber_properties* props)
29  : io_props(*static_cast<IoFiberProperties*>(props)) {}
30 
31  void set_resume_ts(uint64_t ts) { io_props.resume_ts_ = ts; }
32  void set_awaken_ts(uint64_t ts) { io_props.awaken_ts_ = ts; }
33 };
34 
35 namespace {
36 constexpr unsigned MAIN_NICE_LEVEL = 0;
37 constexpr unsigned DISPATCH_LEVEL = IoFiberProperties::NUM_NICE_LEVELS;
38 
39 constexpr unsigned NOTIFY_GUARD_SHIFT = 16;
40 constexpr chrono::steady_clock::time_point STEADY_PT_MAX = chrono::steady_clock::time_point::max();
41 
42 inline int64_t delta_micros(const chrono::steady_clock::time_point tp) {
43  static_assert(8 == sizeof(chrono::steady_clock::time_point), "");
44  return chrono::duration_cast<chrono::microseconds>(tp - chrono::steady_clock::now()).count();
45 }
46 
47 class AsioScheduler final : public fibers::algo::algorithm_with_properties<IoFiberProperties> {
48  private:
49  using ready_queue_type = fibers::scheduler::ready_queue_type;
50  std::shared_ptr<asio::io_context> io_context_;
51  std::unique_ptr<asio::steady_timer> suspend_timer_;
52  std::atomic_uint_fast64_t notify_cnt_{0};
53  uint64_t main_loop_wakes_{0}, worker_pick_start_ts_{0}, main_suspend_ts_;
54 
55  std::atomic_uint_fast32_t notify_guard_{0};
56  uint32_t last_nice_level_ = 0;
57  ready_queue_type rqueue_arr_[IoFiberProperties::NUM_NICE_LEVELS + 1];
58  std::size_t ready_cnt_{0};
59  std::size_t switch_cnt_{0};
60 
61  fibers::context* main_loop_ctx_ = nullptr;
62  chrono::steady_clock::time_point suspend_tp_ = STEADY_PT_MAX;
63 
64  enum : uint8_t { LOOP_RUN_ONE = 1, MAIN_LOOP_SUSPEND = 2, MAIN_LOOP_FINISHED = 4 };
65  uint8_t mask_ = 0;
66 
67  public:
68  //[asio_rr_ctor
69  AsioScheduler(const std::shared_ptr<asio::io_context>& io_svc)
70  : io_context_(io_svc), suspend_timer_(new asio::steady_timer(*io_svc)) {}
71 
72  ~AsioScheduler();
73 
74  void awakened(fibers::context* ctx, IoFiberProperties& props) noexcept override;
75 
76  fibers::context* pick_next() noexcept override;
77 
78  void property_change(boost::fibers::context* ctx, IoFiberProperties& props) noexcept final {
79  // Although our priority_props class defines multiple properties, only
80  // one of them (priority) actually calls notify() when changed. The
81  // point of a property_change() override is to reshuffle the ready
82  // queue according to the updated priority value.
83 
84  // 'ctx' might not be in our queue at all, if caller is changing the
85  // priority of (say) the running fiber. If it's not there, no need to
86  // move it: we'll handle it next time it hits awakened().
87  if (!ctx->ready_is_linked()) {
88  return;
89  }
90 
91  // Found ctx: unlink it
92  ctx->ready_unlink();
93  if (!ctx->is_context(fibers::type::dispatcher_context)) {
94  DCHECK_GT(ready_cnt_, 0);
95  --ready_cnt_;
96  }
97 
98  // Here we know that ctx was in our ready queue, but we've unlinked
99  // it. We happen to have a method that will (re-)add a context* to the
100  // right place in the ready queue.
101  awakened(ctx, props);
102  }
103 
104  bool has_ready_fibers() const noexcept final { return 0 < ready_cnt_; }
105 
106  // suspend_until halts the thread in case there are no active fibers to run on it.
107  // This is done by dispatcher fiber.
108  void suspend_until(chrono::steady_clock::time_point const& abs_time) noexcept final {
109  DVLOG(2) << "suspend_until " << abs_time.time_since_epoch().count();
110 
111  // Only dispatcher context stops the thread.
112  // Which means that either the main loop in MAIN_LOOP_SUSPEND state or it finished and we
113  // are in shutdown phase.
114  // suspend_until is responsible for THREAD SUSPEND which is not what MAIN_LOOP_SUSPEND for.
115  // In fact, we implement THREAD SUSPEND by deferring the control to asio::io_context::run_one.
116  // For that to happen we need to resume the MAIN_LOOP_SUSPEND, and that's why we
117  // schedule main_loop_ctx_ below.
118  DCHECK(fibers::context::active()->is_context(fibers::type::dispatcher_context));
119 
120  // Set a timer so at least one handler will eventually fire, causing
121  // run_one() to eventually return.
122  if (abs_time != STEADY_PT_MAX && suspend_tp_ != abs_time) {
123  // Each expires_at(time_point) call cancels any previous pending
124  // call. We could inadvertently spin like this:
125  // dispatcher calls suspend_until() with earliest wake time
126  // suspend_until() sets suspend_timer_,
127  // loop() calls run_one()
128  // some other asio handler runs before timer expires
129  // run_one() returns to loop()
130  // loop() yields to dispatcher
131  // dispatcher finds no ready fibers
132  // dispatcher calls suspend_until() with SAME wake time
133  // suspend_until() sets suspend_timer_ to same time, canceling
134  // previous async_wait()
135  // loop() calls run_one()
136  // asio calls suspend_timer_ handler with operation_aborted
137  // run_one() returns to loop()... etc. etc.
138  // So only actually set the timer when we're passed a DIFFERENT
139  // abs_time value.
140  suspend_tp_ = abs_time;
141  suspend_timer_->expires_at(abs_time);
142 
143  suspend_timer_->async_wait([this, abs_time](const system::error_code& ec) {
144  // Call Suspend handler.
145  SuspendCb(ec, abs_time);
146  });
147  VLOG(1) << "Arm suspender at micros from now " << delta_micros(abs_time)
148  << ", abstime: " << abs_time.time_since_epoch().count();
149  }
150  CHECK_EQ(0, mask_ & LOOP_RUN_ONE) << "Deadlock detected";
151 
152  // Awake main_loop_ctx_ in WaitTillFibersSuspend().
153  main_loop_ctx_->get_scheduler()->schedule(main_loop_ctx_);
154  }
155  //]
156 
157  // This function is called from remote threads, to wake this thread in case it's sleeping.
158  // In our case, "sleeping" means - might stuck inside run_one waiting for events.
159  // We break from run_one() by scheduling a timer.
160  void notify() noexcept final;
161 
162  void MainLoop();
163 
164  private:
165  void SuspendCb(const system::error_code& ec, chrono::steady_clock::time_point tp) {
166  VLOG(1) << "Fire suspender " << tp.time_since_epoch().count() << " " << ec;
167  if (!ec) {
168  // The timer has successfully finished.
169  suspend_tp_ = STEADY_PT_MAX;
170 
171  // Switch to dispatch fiber to awaken fibers.
172  this_fiber::yield();
173  } else {
174  CHECK_EQ(ec, asio::error::operation_aborted);
175  }
176  }
177  void WaitTillFibersSuspend();
178 };
179 
180 AsioScheduler::~AsioScheduler() {}
181 
182 void AsioScheduler::MainLoop() {
183  asio::io_context* io_cntx = io_context_.get();
184  main_loop_ctx_ = fibers::context::active();
185 
186  while (!io_cntx->stopped()) {
187  if (has_ready_fibers()) {
188  while (io_cntx->poll())
189  ;
190 
191  auto start = base::GetMonotonicMicrosFast();
192  // Gives up control to allow other fibers to run in the thread.
193  WaitTillFibersSuspend();
194  auto delta = base::GetMonotonicMicrosFast() - start;
195  LOG_IF(INFO, delta > 100000) << "Scheduler: Took " << delta / 1000 << " ms to resume";
196 
197  continue;
198  }
199 
200  // run one handler inside io_context
201  // if no handler available, blocks this thread
202  DVLOG(2) << "MainLoop::RunOneStart";
203  mask_ |= LOOP_RUN_ONE;
204  if (!io_cntx->run_one()) {
205  mask_ &= ~LOOP_RUN_ONE;
206  break;
207  }
208  DVLOG(2) << "MainLoop::RunOneEnd";
209  mask_ &= ~LOOP_RUN_ONE;
210  }
211 
212  VLOG(1) << "MainLoop exited";
213  mask_ |= MAIN_LOOP_FINISHED;
214 
215  // We won't run "run_one" anymore therefore we can remove our dependence on suspend_timer_ and
216  // asio:context. In fact, we can not use timed-waits from now on using fibers code because we've
217  // relied on asio for that.
218  // We guard suspend_timer_ using notify_guard_ with no locks in notify().
219  // However we must block here.
220  // It's a shutdown phase, so we do not care. We can not rely on fiber scheduling methods since
221  // they rely on the loop above. Therefore we just use thread yield.
222 
223  // Signal that we shutdown and check if we need to wait for current notify calls to exit.
224  constexpr uint32_t NOTIFY_BIT = 1U << NOTIFY_GUARD_SHIFT;
225  uint32_t seq = notify_guard_.fetch_or(NOTIFY_BIT);
226  while (seq) {
227  pthread_yield(); // almost like a sleep
228  seq = notify_guard_.load() & (NOTIFY_BIT - 1); // Block untill all notify calls exited.
229  }
230  suspend_timer_.reset(); // now we can free suspend_timer_.
231 
232  VLOG(1) << "MainLoopWakes/NotifyCnt: " << main_loop_wakes_ << "/" << notify_cnt_;
233 }
234 
235 void AsioScheduler::WaitTillFibersSuspend() {
236  // block this fiber till all (ready) fibers are processed
237  // or when AsioScheduler::suspend_until() has been called or awaken() decided to resume it.
238  mask_ |= MAIN_LOOP_SUSPEND;
239 
240  DVLOG(2) << "WaitTillFibersSuspend:Start";
241  main_suspend_ts_ = base::GetMonotonicMicrosFast();
242 
243  main_loop_ctx_->suspend();
244  mask_ &= ~MAIN_LOOP_SUSPEND;
245  switch_cnt_ = 0;
246  DVLOG(2) << "WaitTillFibersSuspend:End";
247 }
248 
249 // Thread-local function
250 void AsioScheduler::awakened(fibers::context* ctx, IoFiberProperties& props) noexcept {
251  DCHECK(!ctx->ready_is_linked());
252 
253  ready_queue_type* rq;
254  if (ctx->is_context(fibers::type::dispatcher_context)) {
255  rq = rqueue_arr_ + DISPATCH_LEVEL;
256  DVLOG(2) << "Ready: " << fibers_ext::short_id(ctx) << " dispatch"
257  << ", ready_cnt: " << ready_cnt_;
258  } else {
259  unsigned nice = props.nice_level();
260  DCHECK_LT(nice, IoFiberProperties::NUM_NICE_LEVELS);
261  rq = rqueue_arr_ + nice;
262  ++ready_cnt_; // increase the number of awakened/ready fibers.
263  if (last_nice_level_ > nice)
264  last_nice_level_ = nice;
265 
266  uint64_t now = base::GetMonotonicMicrosFast();
267  IoFiberPropertiesMgr{&props}.set_awaken_ts(now);
268 
269  // In addition, we wake main_loop_ctx_ is too many switches ocurred
270  // while it was suspended.
271  // It's a convenient place to wake because we are sure there is a least
272  // one ready worker in addition to main_loop_ctx_ and it won't stuck in
273  // run_one().
274  // * main_loop_ctx_->ready_is_linked() could be linked already in the previous invocations
275  // of awakened before pick_next resumed it.
276  if (nice > MAIN_NICE_LEVEL && (mask_ & MAIN_LOOP_SUSPEND) && switch_cnt_ > 0 &&
277  !main_loop_ctx_->ready_is_linked()) {
278  if (now - main_suspend_ts_ > 5000) { // 5ms {
279  DVLOG(2) << "Wake MAIN_LOOP_SUSPEND " << fibers_ext::short_id(main_loop_ctx_)
280  << ", r/s: " << ready_cnt_ << "/" << switch_cnt_;
281 
282  switch_cnt_ = 0;
283  ++ready_cnt_;
284  main_loop_ctx_->ready_link(rqueue_arr_[MAIN_NICE_LEVEL]);
285  last_nice_level_ = MAIN_NICE_LEVEL;
286  ++main_loop_wakes_;
287  }
288  }
289 
290  DVLOG(2) << "Ready: " << fibers_ext::short_id(ctx) << "/" << props.name()
291  << ", nice/rdc: " << nice << "/" << ready_cnt_;
292  }
293 
294  ctx->ready_link(*rq); /*< fiber, enqueue on ready queue >*/
295 }
296 
297 fibers::context* AsioScheduler::pick_next() noexcept {
298  fibers::context* ctx(nullptr);
299  using fibers_ext::short_id;
300 
301  auto now = base::GetMonotonicMicrosFast();
302  auto delta = now - worker_pick_start_ts_;
303  worker_pick_start_ts_ = now;
304 
305  for (; last_nice_level_ < IoFiberProperties::NUM_NICE_LEVELS; ++last_nice_level_) {
306  auto& q = rqueue_arr_[last_nice_level_];
307  if (q.empty())
308  continue;
309 
310  // pop an item from the ready queue
311  ctx = &q.front();
312  q.pop_front();
313 
314  DCHECK(!ctx->is_context(fibers::type::dispatcher_context));
315  DCHECK_GT(ready_cnt_, 0);
316  --ready_cnt_;
317 
318  RAW_VLOG(2, "Switching from %x to %x switch_cnt(%d)", short_id(), short_id(ctx), switch_cnt_);
319  DCHECK(ctx != fibers::context::active());
320 
321  IoFiberPropertiesMgr{ctx->get_properties()}.set_resume_ts(now);
322 
323  // Checking if we want to resume to main loop prematurely to preserve responsiveness
324  // of IO loop. MAIN_NICE_LEVEL is reserved for the main loop so we count only
325  // when we switch to other fibers and the loop is in MAIN_LOOP_SUSPEND state.
326  if ((mask_ & MAIN_LOOP_SUSPEND) && last_nice_level_ > MAIN_NICE_LEVEL) {
327  ++switch_cnt_;
328 
329  if (delta > 30000) {
330  auto* active = fibers::context::active();
331  if (!active->is_context(fibers::type::main_context)) {
332  auto& props = static_cast<IoFiberProperties&>(*active->get_properties());
333 
334  LOG(INFO) << props.name() << " took " << delta / 1000 << " ms";
335  }
336  }
337 
338  // We can not prematurely wake MAIN_LOOP_SUSPEND if ready_cnt_ == 0.
339  // The reason for this is that in that case the main loop will call "run_one" during the
340  // next iteration and might get stuck there because we never reached suspend_until
341  // that might configure asio loop to break early.
342  // This is why we break from MAIN_LOOP_SUSPEND inside waken call, where
343  // we are sure there is at least one worker fiber, and the main loop won't stuck in run_one.
344  }
345 
346  RAW_VLOG(3, "pick_next: %x", short_id(ctx));
347 
348  return ctx;
349  }
350 
351  DCHECK_EQ(0, ready_cnt_);
352 
353  auto& dispatch_q = rqueue_arr_[DISPATCH_LEVEL];
354  if (!dispatch_q.empty()) {
355  fibers::context* ctx = &dispatch_q.front();
356  dispatch_q.pop_front();
357 
358  RAW_VLOG(2, "Switching from ", short_id(), " to dispatch ", short_id(ctx),
359  ", mask: ", unsigned(mask_));
360  return ctx;
361  }
362 
363  RAW_VLOG(2, "pick_next: null");
364 
365  return nullptr;
366 }
367 
368 void AsioScheduler::notify() noexcept {
369  uint32_t seq = notify_guard_.fetch_add(1, std::memory_order_acq_rel);
370 
371  if ((seq >> 16) == 0) { // Test whether are not shuttind down.
372 
373  // We need to break from run_one asap.
374  // If we've already armed suspend_timer_ via suspend_until then cancelling it would suffice.
375  // However, in case suspend_until has not been called or called STEADY_PT_MAX,
376  // then there is nothing to cancel and run_one won't break.
377  // In addition, AsioScheduler::notify is called from remote thread so may only access
378  // thread-safe data. Therefore, the simplest solution would be just post a callback.
379  RAW_VLOG(1, "AsioScheduler::notify");
380 
381  /*suspend_timer_->expires_at(chrono::steady_clock::now());
382  suspend_timer_->async_wait([this](const system::error_code& ec) {
383  VLOG(1) << "Awake suspender, tp: " << suspend_tp_.time_since_epoch().count() << " " << ec;
384  this_fiber::yield();
385  });*/
386 
387  // yield is needed so that when the main fiber is waken up, it would switch to dispatch fiber
388  // to call scheduler awaken hooks.
389  asio::post(*io_context_, [] { this_fiber::yield(); });
390  notify_cnt_.fetch_add(1, std::memory_order_relaxed);
391  } else {
392  RAW_VLOG(1, "Called during shutdown phase");
393  }
394  notify_guard_.fetch_sub(1, std::memory_order_acq_rel);
395 }
396 
397 } // namespace
398 
399 constexpr unsigned IoFiberProperties::MAX_NICE_LEVEL;
400 constexpr unsigned IoFiberProperties::NUM_NICE_LEVELS;
401 
402 void IoFiberProperties::SetNiceLevel(unsigned p) {
403  // Of course, it's only worth reshuffling the queue and all if we're
404  // actually changing the nice.
405  p = std::min(p, MAX_NICE_LEVEL);
406  if (p != nice_) {
407  nice_ = p;
408  notify();
409  }
410 }
411 
412 void IoContext::StartLoop(BlockingCounter* bc) {
413  // I do not use use_scheduling_algorithm because I want to retain access to the scheduler.
414  // fibers::use_scheduling_algorithm<AsioScheduler>(io_ptr);
415  AsioScheduler* scheduler = new AsioScheduler(context_ptr_);
416  fibers::context::active()->get_scheduler()->set_algo(scheduler);
417  this_fiber::properties<IoFiberProperties>().set_name("io_loop");
418  this_fiber::properties<IoFiberProperties>().SetNiceLevel(MAIN_NICE_LEVEL);
419  CHECK(fibers::context::active()->is_context(fibers::type::main_context));
420 
421  thread_id_ = this_thread::get_id();
422 
423  io_context& io_cntx = *context_ptr_;
424 
425  // We run the main loop inside the callback of io_context, blocking it until the loop exits.
426  // The reason for this is that io_context::running_in_this_thread() is deduced based on the
427  // call-stack. GAIA code should use InContextThread() to check whether the code runs in the
428  // context's thread.
429  Async([scheduler, bc] {
430  bc->Dec();
431  scheduler->MainLoop();
432  });
433 
434  // Bootstrap - launch the callback handler above.
435  // It will block until MainLoop exits. See comment above.
436  io_cntx.run_one();
437 
438  // Shutdown phase.
439  for (unsigned i = 0; i < 2; ++i) {
440  DVLOG(1) << "Cleanup Loop " << i;
441  while (io_cntx.poll() || scheduler->has_ready_fibers()) {
442  this_fiber::yield(); // while something happens, pass the ownership to other fiber.
443  }
444  io_cntx.restart();
445  }
446 }
447 
448 void IoContext::Stop() {
449  if (cancellable_arr_.size() > 0) {
450  fibers_ext::BlockingCounter cancel_bc(cancellable_arr_.size());
451 
452  VLOG(1) << "Cancelling " << cancellable_arr_.size() << " cancellables";
453  // Shutdown sequence and cleanup.
454  for (auto& k_v : cancellable_arr_) {
455  AsyncFiber([&] {
456  k_v.first->Cancel();
457  cancel_bc.Dec();
458  });
459  }
460  cancel_bc.Wait();
461  for (auto& k_v : cancellable_arr_) {
462  k_v.second.join();
463  }
464  cancellable_arr_.clear();
465  }
466 
467  context_ptr_->stop();
468  VLOG(1) << "AsioIoContext stopped";
469 }
470 
471 } // namespace util