uring_fiber_algo.cc
1 // Copyright 2020, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 
5 #include "util/uring/uring_fiber_algo.h"
6 
7 #include "base/logging.h"
8 #include "util/uring/proactor.h"
9 
10 // TODO: We should replace DVLOG macros with RAW_VLOG if we do glog sync integration.
11 
12 namespace util {
13 namespace uring {
14 using namespace boost;
15 using namespace std;
16 
17 UringFiberAlgo::UringFiberAlgo(Proactor* proactor) : proactor_(proactor) {
18  main_cntx_ = fibers::context::active();
19  CHECK(main_cntx_->is_context(fibers::type::main_context));
20 }
21 
22 UringFiberAlgo::~UringFiberAlgo() {
23 }
24 
25 void UringFiberAlgo::awakened(FiberContext* ctx, UringFiberProps& props) noexcept {
26  DCHECK(!ctx->ready_is_linked());
27 
28  if (ctx->is_context(fibers::type::dispatcher_context)) {
29  DVLOG(2) << "Awakened dispatch";
30  } else {
31  DVLOG(2) << "Awakened " << props.name();
32 
33  ++ready_cnt_; // increase the number of awakened/ready fibers.
34  }
35 
36  ctx->ready_link(rqueue_); /*< fiber, enqueue on ready queue >*/
37 }
38 
39 auto UringFiberAlgo::pick_next() noexcept -> FiberContext* {
40  DVLOG(2) << "pick_next: " << ready_cnt_ << "/" << rqueue_.size();
41 
42  if (rqueue_.empty())
43  return nullptr;
44 
45  FiberContext* ctx = &rqueue_.front();
46  rqueue_.pop_front();
47 
48  if (!ctx->is_context(boost::fibers::type::dispatcher_context)) {
49  --ready_cnt_;
50  UringFiberProps* props = (UringFiberProps*)ctx->get_properties();
51  DVLOG(1) << "Switching to " << props->name(); // TODO: to switch to RAW_LOG.
52  } else {
53  DVLOG(1) << "Switching to dispatch"; // TODO: to switch to RAW_LOG.
54  }
55  return ctx;
56 }
57 
58 void UringFiberAlgo::property_change(FiberContext* ctx, UringFiberProps& props) noexcept {
59  if (!ctx->ready_is_linked()) {
60  return;
61  }
62 
63  // Found ctx: unlink it
64  ctx->ready_unlink();
65  if (!ctx->is_context(fibers::type::dispatcher_context)) {
66  --ready_cnt_;
67  }
68 
69  // Here we know that ctx was in our ready queue, but we've unlinked
70  // it. We happen to have a method that will (re-)add a context* to the
71  // right place in the ready queue.
72  awakened(ctx, props);
73 }
74 
75 bool UringFiberAlgo::has_ready_fibers() const noexcept {
76  return ready_cnt_ > 0;
77 }
78 
79 // suspend_until halts the thread in case there are no active fibers to run on it.
80 // This function is called by dispatcher fiber.
81 void UringFiberAlgo::suspend_until(const time_point& abs_time) noexcept {
82  auto* cur_cntx = fibers::context::active();
83 
84  DCHECK(cur_cntx->is_context(fibers::type::dispatcher_context));
85  if (time_point::max() != abs_time) {
86  auto cb = [](Proactor::IoResult res, int64_t, Proactor*) {
87  // If io_uring does not support timeout, then this callback will be called
88  // earlier than needed and dispatch won't awake the sleeping fiber.
89  // This will cause deadlock.
90  DCHECK_NE(res, -EINVAL) << "This linux version does not support this operation";
91  DVLOG(1) << "this_fiber::yield " << res;
92 
93  this_fiber::yield();
94  };
95 
96  // TODO: if we got here, most likely our completion queues were empty so
97  // it's unlikely that we will have full submit queue but this state may happen.
98  // GetSubmitEntry may block which may cause a deadlock since our main loop is not
99  // running (it's probably in suspend mode letting dispatcher fiber to run).
100  // Therefore we must use here non blocking calls.
101  // But what happens if SQ is full?
102  // SQ is full we can not use IoUring to schedule awake event, our CQ queue is empty so
103  // we have nothing to process. We might want to give up on this timer and just wait on CQ
104  // since we know something might come up. On the other hand, imagine we send requests on sockets
105  // but they all do not answer so SQ is eventually full, CQ is empty and our IO loop is overflown
106  // and no entries could be processed.
107  // We must reproduce this case: small SQ/CQ. Fill SQ/CQ with alarms that expire in a long time.
108  // So at some point SQ-push returns EBUSY. Now we call this_fiber::sleep and we GetSubmitEntry
109  // would block.
110  SubmitEntry se = proactor_->GetSubmitEntry(std::move(cb), 0);
111  using namespace chrono;
112  constexpr uint64_t kNsFreq = 1000000000ULL;
113 
114  const chrono::time_point<steady_clock, nanoseconds>& tp = abs_time;
115  uint64_t ns = time_point_cast<nanoseconds>(tp).time_since_epoch().count();
116  ts_.tv_sec = ns / kNsFreq;
117  ts_.tv_nsec = ns - ts_.tv_sec * kNsFreq;
118 
119  // Please note that we can not pass var on stack because we exit from the function
120  // before we submit to ring. That's why ts_ is a data member.
121  se.PrepTimeout(&ts_);
122  }
123 
124  // schedule does not block just marks main_cntx_ for activation.
125  main_cntx_->get_scheduler()->schedule(main_cntx_);
126 }
127 
128 // This function is called from remote threads, to wake this thread in case it's sleeping.
129 // In our case, "sleeping" means - might stuck the wait function waiting for completion events.
130 // wait_for_cqe is the only place where the thread can be stalled.
131 void UringFiberAlgo::notify() noexcept {
132  DVLOG(1) << "notify from " << syscall(SYS_gettid);
133 
134  // We signal so that
135  // 1. Main context should awake if it is not
136  // 2. it needs to yield to dispatch context that will put active fibers into
137  // ready queue.
138  auto prev_val = proactor_->tq_seq_.fetch_or(1, std::memory_order_relaxed);
139  if (prev_val == Proactor::WAIT_SECTION_STATE) {
140  proactor_->WakeRing();
141  }
142 }
143 
144 } // namespace uring
145 } // namespace util