proactor.cc
1 // Copyright 2020, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 
5 #include "util/uring/proactor.h"
6 
7 #include <liburing.h>
8 #include <string.h>
9 #include <sys/eventfd.h>
10 #include <sys/poll.h>
11 
12 #include <boost/fiber/operations.hpp>
13 #include <boost/fiber/scheduler.hpp>
14 
15 #include "absl/base/attributes.h"
16 #include "base/logging.h"
17 #include "base/macros.h"
18 #include "util/uring/uring_fiber_algo.h"
19 
20 #define URING_CHECK(x) \
21  do { \
22  int __res_val = (x); \
23  if (UNLIKELY(__res_val < 0)) { \
24  char buf[128]; \
25  char* str = strerror_r(-__res_val, buf, sizeof(buf)); \
26  LOG(FATAL) << "Error " << (-__res_val) << " evaluating '" #x "': " << str; \
27  } \
28  } while (false)
29 
30 #ifndef __NR_io_uring_enter
31 #define __NR_io_uring_enter 426
32 #endif
33 
34 using namespace boost;
35 namespace ctx = boost::context;
36 
37 namespace util {
38 namespace uring {
39 
40 namespace {
41 
42 inline int sys_io_uring_enter(int fd, unsigned to_submit, unsigned min_complete, unsigned flags,
43  sigset_t* sig) {
44  return syscall(__NR_io_uring_enter, fd, to_submit, min_complete, flags, sig, _NSIG / 8);
45 }
46 
47 ABSL_ATTRIBUTE_NOINLINE int wait_for_cqe(io_uring* ring, sigset_t* sig = NULL) {
48  // res must be 0 or -1.
49  int res = sys_io_uring_enter(ring->ring_fd, 0, 1, IORING_ENTER_GETEVENTS, sig);
50  if (res == 0 || errno == EINTR)
51  return res;
52  DCHECK_EQ(-1, res);
53  res = errno;
54 
55  LOG(FATAL) << "Error " << (res) << " evaluating sys_io_uring_enter: " << strerror(res);
56  return 0;
57 }
58 
59 struct signal_state {
60  struct Item {
61  Proactor* proactor = nullptr;
62  std::function<void(int)> cb;
63  };
64 
65  Item signal_map[_NSIG];
66 };
67 
68 signal_state* get_signal_state() {
69  static signal_state state;
70 
71  return &state;
72 }
73 
74 void SigAction(int signal, siginfo_t*, void*) {
75  signal_state* state = get_signal_state();
76  DCHECK_LT(signal, _NSIG);
77 
78  auto& item = state->signal_map[signal];
79  auto cb = [signal, &item] { item.cb(signal); };
80 
81  if (item.proactor && item.cb) {
82  item.proactor->AsyncFiber(std::move(cb));
83  } else {
84  LOG(ERROR) << "Tangling signal handler " << signal;
85  }
86 }
87 
88 inline uint64_t GetClockNanos() {
89  timespec ts;
90  clock_gettime(CLOCK_MONOTONIC, &ts);
91  return ts.tv_sec * std::nano::den + ts.tv_nsec;
92 }
93 
94 inline unsigned CQReadyCount(const io_uring& ring) {
95  return io_uring_smp_load_acquire(ring.cq.ktail) - *ring.cq.khead;
96 }
97 
98 unsigned IoRingPeek(const io_uring& ring, io_uring_cqe* cqes, unsigned count) {
99  unsigned ready = CQReadyCount(ring);
100  if (!ready)
101  return 0;
102 
103  count = count > ready ? ready : count;
104  unsigned head = *ring.cq.khead;
105  unsigned mask = *ring.cq.kring_mask;
106  unsigned last = head + count;
107  for (int i = 0; head != last; head++, i++) {
108  cqes[i] = ring.cq.cqes[head & mask];
109  }
110  return count;
111 }
112 
113 constexpr uint64_t kIgnoreIndex = 0;
114 constexpr uint64_t kWakeIndex = 1;
115 constexpr uint64_t kUserDataCbIndex = 1024;
116 constexpr uint32_t kSpinLimit = 200;
117 
118 } // namespace
119 
120 thread_local Proactor::TLInfo Proactor::tl_info_;
121 
122 Proactor::Proactor() : task_queue_(128) {
123  wake_fd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
124  CHECK_GT(wake_fd_, 0);
125 
126  volatile ctx::fiber dummy; // For some weird reason I need this to pull
127  // boost::context into linkage.
128 }
129 
130 Proactor::~Proactor() {
131  CHECK(is_stopped_);
132  if (thread_id_ != -1U) {
133  io_uring_queue_exit(&ring_);
134  }
135  close(wake_fd_);
136 
137  signal_state* ss = get_signal_state();
138  for (size_t i = 0; i < arraysize(ss->signal_map); ++i) {
139  if (ss->signal_map[i].proactor == this) {
140  ss->signal_map[i].proactor = nullptr;
141  ss->signal_map[i].cb = nullptr;
142  }
143  }
144 }
145 
147  AsyncBrief([this] { is_stopped_ = true; });
148  VLOG(1) << "Proactor::StopFinish";
149 }
150 
151 void Proactor::Run(unsigned ring_depth, int wq_fd) {
152  VLOG(1) << "Proactor::Run";
153  Init(ring_depth, wq_fd);
154 
155  main_loop_ctx_ = fibers::context::active();
156  fibers::scheduler* sched = main_loop_ctx_->get_scheduler();
157 
158  UringFiberAlgo* scheduler = new UringFiberAlgo(this);
159  sched->set_algo(scheduler);
160  this_fiber::properties<UringFiberProps>().set_name("ioloop");
161 
162  is_stopped_ = false;
163 
164  constexpr size_t kBatchSize = 64;
165  struct io_uring_cqe cqes[kBatchSize];
166  uint32_t tq_seq = 0;
167  uint32_t num_stalls = 0;
168  uint32_t spin_loops = 0, num_task_runs = 0;
169  Tasklet task;
170 
171  while (true) {
172  int num_submitted = io_uring_submit(&ring_);
173 
174  if (num_submitted >= 0) {
175  if (num_submitted)
176  DVLOG(2) << "Submitted " << num_submitted;
177  } else if (num_submitted == -EBUSY) {
178  VLOG(1) << "EBUSY " << num_submitted;
179  num_submitted = 0;
180  } else {
181  URING_CHECK(num_submitted);
182  }
183 
184  num_task_runs = 0;
185 
186  uint64_t task_start = 0;
187 
188  tq_seq = tq_seq_.load(std::memory_order_acquire);
189 
190  // This should handle wait-free and "submit-free" short CPU tasks enqued using Async/Await
191  // calls. We allocate the quota of 500K nsec (500usec) of CPU time per iteration.
192  while (task_queue_.try_dequeue(task)) {
193  ++num_task_runs;
194  tl_info_.monotonic_time = GetClockNanos();
195  task();
196  if (task_start == 0) {
197  task_start = tl_info_.monotonic_time;
198  } else if (task_start + 500000 < tl_info_.monotonic_time) {
199  break;
200  }
201  }
202 
203  if (num_task_runs) {
204  // Should we put 'notify' inside the loop? It might improve the latency.
205  task_queue_avail_.notifyAll();
206  }
207 
208  uint32_t cqe_count = IoRingPeek(ring_, cqes, kBatchSize);
209  if (cqe_count) {
210  // Once we copied the data we can mark the cqe consumed.
211  io_uring_cq_advance(&ring_, cqe_count);
212  DVLOG(2) << "Fetched " << cqe_count << " cqes";
213  DispatchCompletions(cqes, cqe_count);
214  sqe_avail_.notifyAll();
215  }
216 
217  if (tq_seq & 1) { // We allow dispatch fiber to run.
218  tq_seq_.fetch_and(~1, std::memory_order_relaxed);
219  this_fiber::yield();
220  }
221 
222  if (sched->has_ready_fibers()) {
223  // Suspend this fiber until others will run and get blocked.
224  // Eventually UringFiberAlgo will resume back this fiber in suspend_until
225  // function.
226  DVLOG(2) << "Suspend ioloop";
227  tl_info_.monotonic_time = GetClockNanos();
228  sched->suspend();
229 
230  DVLOG(2) << "Resume ioloop";
231  continue;
232  }
233 
234  if (cqe_count || io_uring_sq_ready(&ring_))
235  continue;
236 
237  // Lets spin a bit to make a system a bit more responsive.
238  if (++spin_loops < kSpinLimit) {
239  // pthread_yield(); We should not spin using sched_yield it burns fuckload of cpu.
240  continue;
241  }
242 
243  spin_loops = 0; // Reset the spinning.
244 
252  if (tq_seq_.compare_exchange_weak(tq_seq, WAIT_SECTION_STATE, std::memory_order_acquire)) {
253  if (is_stopped_)
254  break;
255  DVLOG(1) << "wait_for_cqe";
256  int res = wait_for_cqe(&ring_);
257  DVLOG(1) << "Woke up " << res << "/" << tq_seq_.load(std::memory_order_acquire);
258 
259  tq_seq = 0;
260  ++num_stalls;
261 
262  // Reset all except the LSB bit that signals that we need to switch to dispatch fiber.
263  tq_seq_.fetch_and(1, std::memory_order_release);
264  }
265  }
266 
267  VLOG(1) << "wakeups/stalls: " << tq_wakeups_.load() << "/" << num_stalls;
268 
269  VLOG(1) << "centries size: " << centries_.size();
270  centries_.clear();
271 }
272 
273 void Proactor::Init(size_t ring_size, int wq_fd) {
274  CHECK_EQ(0, ring_size & (ring_size - 1));
275  CHECK_GE(ring_size, 8);
276  CHECK_EQ(0, thread_id_) << "Init was already called";
277 
278  io_uring_params params;
279  memset(&params, 0, sizeof(params));
280 
281  // Optionally reuse the already created work-queue from another uring.
282  if (wq_fd > 0) {
283  params.flags |= IORING_SETUP_ATTACH_WQ;
284  params.wq_fd = wq_fd;
285  }
286 
287  // it seems that SQPOLL requires registering each fd, including sockets fds.
288  // need to check if its worth pursuing.
289  // For sure not in short-term.
290  // params.flags = IORING_SETUP_SQPOLL;
291  URING_CHECK(io_uring_queue_init_params(ring_size, &ring_, &params));
292  fast_poll_f_ = (params.features & IORING_FEAT_FAST_POLL) != 0;
293  if (!fast_poll_f_) {
294  LOG_FIRST_N(INFO, 1) << "IORING_FEAT_FAST_POLL feature is not present in the kernel";
295  }
296 
297  if (0 == (params.features & IORING_FEAT_NODROP)) {
298  LOG_FIRST_N(INFO, 1) << "IORING_FEAT_NODROP feature is not present in the kernel";
299  }
300 
301  if (params.features & IORING_FEAT_SINGLE_MMAP) {
302  size_t sz = ring_.sq.ring_sz + params.sq_entries * sizeof(struct io_uring_sqe);
303  LOG_FIRST_N(INFO, 1) << "IORing with " << params.sq_entries << " allocated " << sz
304  << " bytes, cq_entries is " << *ring_.cq.kring_entries;
305  }
306  CHECK_EQ(ring_size, params.sq_entries); // Sanity.
307 
308  struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
309  CHECK_NOTNULL(sqe);
310 
311  io_uring_prep_poll_add(sqe, wake_fd_, POLLIN);
312  sqe->user_data = kWakeIndex;
313 
314  centries_.resize(params.sq_entries);
315  next_free_ = 0;
316  for (size_t i = 0; i < centries_.size() - 1; ++i) {
317  centries_[i].val = i + 1;
318  }
319 
320  thread_id_ = pthread_self();
321  tl_info_.is_proactor_thread = true;
322 }
323 
324 void Proactor::WakeRing() {
325  DVLOG(1) << "Wake ring " << tq_seq_.load(std::memory_order_relaxed);
326 
327  tq_wakeups_.fetch_add(1, std::memory_order_relaxed);
328  uint64_t val = 1;
329 
330  CHECK_EQ(8, write(wake_fd_, &val, sizeof(uint64_t)));
331 }
332 
333 void Proactor::DispatchCompletions(io_uring_cqe* cqes, unsigned count) {
334  for (unsigned i = 0; i < count; ++i) {
335  auto& cqe = cqes[i];
336 
337  // I allocate range of 1024 reserved values for the internal Proactor use.
338 
339  if (cqe.user_data >= kUserDataCbIndex) { // our heap range surely starts higher than 1k.
340  size_t index = cqe.user_data - kUserDataCbIndex;
341  DCHECK_LT(index, centries_.size());
342  auto& e = centries_[index];
343  DCHECK(e.cb) << index;
344 
345  CbType func;
346  auto payload = e.val;
347  func.swap(e.cb);
348 
349  e.val = next_free_;
350  next_free_ = index;
351 
352  func(cqe.res, payload, this);
353  continue;
354  }
355 
356  if (cqe.user_data == kIgnoreIndex)
357  continue;
358 
359  if (cqe.user_data == kWakeIndex) {
360  // We were woken up. Need to rearm wake_fd_ poller.
361  DCHECK_GE(cqe.res, 0);
362 
363  DVLOG(1) << "Wakeup " << cqe.res << "/" << cqe.flags;
364 
365  CHECK_EQ(8, read(wake_fd_, &cqe.user_data, 8)); // Pull the data
366 
367  // TODO: to move io_uring_get_sqe call from here to before we stall.
368  struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
369  CHECK(sqe); // We just consumed CQEs. I assume it's enough to get an SQE.
370  io_uring_prep_poll_add(sqe, wake_fd_, POLLIN); // And rearm.
371  sqe->user_data = kWakeIndex;
372  } else {
373  LOG(ERROR) << "Unrecognized user_data " << cqe.user_data;
374  }
375  }
376 }
377 
378 SubmitEntry Proactor::GetSubmitEntry(CbType cb, int64_t payload) {
379  io_uring_sqe* res = io_uring_get_sqe(&ring_);
380  if (res == NULL) {
381  fibers::context* current = fibers::context::active();
382  CHECK(current != main_loop_ctx_) << "SQE overflow in the main context";
383 
384  sqe_avail_.await([this] { return io_uring_sq_space_left(&ring_) > 0; });
385  res = io_uring_get_sqe(&ring_); // now we should have the space.
386  CHECK(res);
387  }
388 
389  if (cb) {
390  if (next_free_ < 0) {
391  RegrowCentries();
392  DCHECK_GT(next_free_, 0);
393  }
394 
395  res->user_data = next_free_ + kUserDataCbIndex;
396  DCHECK_LT(next_free_, centries_.size());
397 
398  auto& e = centries_[next_free_];
399  DCHECK(!e.cb); // cb is undefined.
400  DVLOG(1) << "GetSubmitEntry: index: " << next_free_ << ", socket: " << payload;
401 
402  next_free_ = e.val;
403  e.cb = std::move(cb);
404  e.val = payload;
405  e.opcode = -1;
406  } else {
407  res->user_data = kIgnoreIndex;
408  }
409 
410  return SubmitEntry{res};
411 }
412 
413 void Proactor::RegisterSignal(std::initializer_list<uint16_t> l, std::function<void(int)> cb) {
414  auto* state = get_signal_state();
415 
416  struct sigaction sa;
417  memset(&sa, 0, sizeof(sa));
418 
419  if (cb) {
420  sa.sa_flags = SA_SIGINFO;
421  sa.sa_sigaction = &SigAction;
422 
423  for (uint16_t val : l) {
424  CHECK(!state->signal_map[val].cb) << "Signal " << val << " was already registered";
425  state->signal_map[val].cb = cb;
426  state->signal_map[val].proactor = this;
427 
428  CHECK_EQ(0, sigaction(val, &sa, NULL));
429  }
430  } else {
431  sa.sa_handler = SIG_DFL;
432 
433  for (uint16_t val : l) {
434  CHECK(state->signal_map[val].cb) << "Signal " << val << " was already registered";
435  state->signal_map[val].cb = nullptr;
436  state->signal_map[val].proactor = nullptr;
437 
438  CHECK_EQ(0, sigaction(val, &sa, NULL));
439  }
440  }
441 }
442 
443 void Proactor::RegrowCentries() {
444  size_t prev = centries_.size();
445  VLOG(1) << "RegrowCentries from " << prev << " to " << prev * 2;
446 
447  centries_.resize(prev * 2); // grow by 2.
448  next_free_ = prev;
449  for (; prev < centries_.size() - 1; ++prev)
450  centries_[prev].val = prev + 1;
451 }
452 
453 } // namespace uring
454 } // namespace util
bool await(Condition condition)
Definition: event_count.h:178
void Run(unsigned ring_depth=512, int wq_fd=-1)
Definition: proactor.cc:151
void AsyncBrief(Func &&brief)
Definition: proactor.h:229
SubmitEntry GetSubmitEntry(CbType cb, int64_t payload)
Get the Submit Entry object in order to issue I/O request.
Definition: proactor.cc:378
void Stop()
Signals proactor to stop. Does not wait for it.
Definition: proactor.cc:146