5 #include "util/uring/proactor.h"     9 #include <sys/eventfd.h>    12 #include <boost/fiber/operations.hpp>    13 #include <boost/fiber/scheduler.hpp>    15 #include "absl/base/attributes.h"    16 #include "base/logging.h"    17 #include "base/macros.h"    18 #include "util/uring/uring_fiber_algo.h"    20 #define URING_CHECK(x)                                                           \    22     int __res_val = (x);                                                         \    23     if (UNLIKELY(__res_val < 0)) {                                               \    25       char* str = strerror_r(-__res_val, buf, sizeof(buf));                      \    26       LOG(FATAL) << "Error " << (-__res_val) << " evaluating '" #x "': " << str; \    30 #ifndef __NR_io_uring_enter    31 #define __NR_io_uring_enter 426    34 using namespace boost;
    35 namespace ctx = boost::context;
    42 inline int sys_io_uring_enter(
int fd, 
unsigned to_submit, 
unsigned min_complete, 
unsigned flags,
    44   return syscall(__NR_io_uring_enter, fd, to_submit, min_complete, flags, sig, _NSIG / 8);
    47 ABSL_ATTRIBUTE_NOINLINE 
int wait_for_cqe(io_uring* ring, sigset_t* sig = NULL) {
    49   int res = sys_io_uring_enter(ring->ring_fd, 0, 1, IORING_ENTER_GETEVENTS, sig);
    50   if (res == 0 || errno == EINTR)
    55   LOG(FATAL) << 
"Error " << (res) << 
" evaluating sys_io_uring_enter: " << strerror(res);
    61     Proactor* proactor = 
nullptr;
    62     std::function<void(
int)> cb;
    65   Item signal_map[_NSIG];
    68 signal_state* get_signal_state() {
    69   static signal_state state;
    74 void SigAction(
int signal, siginfo_t*, 
void*) {
    75   signal_state* state = get_signal_state();
    76   DCHECK_LT(signal, _NSIG);
    78   auto& item = state->signal_map[signal];
    79   auto cb = [signal, &item] { item.cb(signal); };
    81   if (item.proactor && item.cb) {
    82     item.proactor->AsyncFiber(std::move(cb));
    84     LOG(ERROR) << 
"Tangling signal handler " << signal;
    88 inline uint64_t GetClockNanos() {
    90   clock_gettime(CLOCK_MONOTONIC, &ts);
    91   return ts.tv_sec * std::nano::den + ts.tv_nsec;
    94 inline unsigned CQReadyCount(
const io_uring& ring) {
    95   return io_uring_smp_load_acquire(ring.cq.ktail) - *ring.cq.khead;
    98 unsigned IoRingPeek(
const io_uring& ring, io_uring_cqe* cqes, 
unsigned count) {
    99   unsigned ready = CQReadyCount(ring);
   103   count = count > ready ? ready : count;
   104   unsigned head = *ring.cq.khead;
   105   unsigned mask = *ring.cq.kring_mask;
   106   unsigned last = head + count;
   107   for (
int i = 0; head != last; head++, i++) {
   108     cqes[i] = ring.cq.cqes[head & mask];
   113 constexpr uint64_t kIgnoreIndex = 0;
   114 constexpr uint64_t kWakeIndex = 1;
   115 constexpr uint64_t kUserDataCbIndex = 1024;
   116 constexpr uint32_t kSpinLimit = 200;
   120 thread_local Proactor::TLInfo Proactor::tl_info_;
   122 Proactor::Proactor() : task_queue_(128) {
   123   wake_fd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
   124   CHECK_GT(wake_fd_, 0);
   126   volatile ctx::fiber dummy;  
   130 Proactor::~Proactor() {
   132   if (thread_id_ != -1U) {
   133     io_uring_queue_exit(&ring_);
   137   signal_state* ss = get_signal_state();
   138   for (
size_t i = 0; i < arraysize(ss->signal_map); ++i) {
   139     if (ss->signal_map[i].proactor == 
this) {
   140       ss->signal_map[i].proactor = 
nullptr;
   141       ss->signal_map[i].cb = 
nullptr;
   148   VLOG(1) << 
"Proactor::StopFinish";
   152   VLOG(1) << 
"Proactor::Run";
   153   Init(ring_depth, wq_fd);
   155   main_loop_ctx_ = fibers::context::active();
   156   fibers::scheduler* sched = main_loop_ctx_->get_scheduler();
   159   sched->set_algo(scheduler);
   160   this_fiber::properties<UringFiberProps>().set_name(
"ioloop");
   164   constexpr 
size_t kBatchSize = 64;
   165   struct io_uring_cqe cqes[kBatchSize];
   167   uint32_t num_stalls = 0;
   168   uint32_t spin_loops = 0, num_task_runs = 0;
   172     int num_submitted = io_uring_submit(&ring_);
   174     if (num_submitted >= 0) {
   176         DVLOG(2) << 
"Submitted " << num_submitted;
   177     } 
else if (num_submitted == -EBUSY) {
   178       VLOG(1) << 
"EBUSY " << num_submitted;
   181       URING_CHECK(num_submitted);
   186     uint64_t task_start = 0;
   188     tq_seq = tq_seq_.load(std::memory_order_acquire);
   192     while (task_queue_.try_dequeue(task)) {
   194       tl_info_.monotonic_time = GetClockNanos();
   196       if (task_start == 0) {
   197         task_start = tl_info_.monotonic_time;
   198       } 
else if (task_start + 500000 < tl_info_.monotonic_time) {
   205       task_queue_avail_.notifyAll();
   208     uint32_t cqe_count = IoRingPeek(ring_, cqes, kBatchSize);
   211       io_uring_cq_advance(&ring_, cqe_count);
   212       DVLOG(2) << 
"Fetched " << cqe_count << 
" cqes";
   213       DispatchCompletions(cqes, cqe_count);
   214       sqe_avail_.notifyAll();
   218       tq_seq_.fetch_and(~1, std::memory_order_relaxed);
   222     if (sched->has_ready_fibers()) {
   226       DVLOG(2) << 
"Suspend ioloop";
   227       tl_info_.monotonic_time = GetClockNanos();
   230       DVLOG(2) << 
"Resume ioloop";
   234     if (cqe_count || io_uring_sq_ready(&ring_))
   238     if (++spin_loops < kSpinLimit) {
   252     if (tq_seq_.compare_exchange_weak(tq_seq, WAIT_SECTION_STATE, std::memory_order_acquire)) {
   255       DVLOG(1) << 
"wait_for_cqe";
   256       int res = wait_for_cqe(&ring_);
   257       DVLOG(1) << 
"Woke up " << res << 
"/" << tq_seq_.load(std::memory_order_acquire);
   263       tq_seq_.fetch_and(1, std::memory_order_release);
   267   VLOG(1) << 
"wakeups/stalls: " << tq_wakeups_.load() << 
"/" << num_stalls;
   269   VLOG(1) << 
"centries size: " << centries_.size();
   273 void Proactor::Init(
size_t ring_size, 
int wq_fd) {
   274   CHECK_EQ(0, ring_size & (ring_size - 1));
   275   CHECK_GE(ring_size, 8);
   276   CHECK_EQ(0, thread_id_) << 
"Init was already called";
   278   io_uring_params params;
   279   memset(¶ms, 0, 
sizeof(params));
   283     params.flags |= IORING_SETUP_ATTACH_WQ;
   284     params.wq_fd = wq_fd;
   291   URING_CHECK(io_uring_queue_init_params(ring_size, &ring_, ¶ms));
   292   fast_poll_f_ = (params.features & IORING_FEAT_FAST_POLL) != 0;
   294     LOG_FIRST_N(INFO, 1) << 
"IORING_FEAT_FAST_POLL feature is not present in the kernel";
   297   if (0 == (params.features & IORING_FEAT_NODROP)) {
   298     LOG_FIRST_N(INFO, 1) << 
"IORING_FEAT_NODROP feature is not present in the kernel";
   301   if (params.features & IORING_FEAT_SINGLE_MMAP) {
   302     size_t sz = ring_.sq.ring_sz + params.sq_entries * 
sizeof(
struct io_uring_sqe);
   303     LOG_FIRST_N(INFO, 1) << 
"IORing with " << params.sq_entries << 
" allocated " << sz
   304                          << 
" bytes, cq_entries is " << *ring_.cq.kring_entries;
   306   CHECK_EQ(ring_size, params.sq_entries);  
   308   struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
   311   io_uring_prep_poll_add(sqe, wake_fd_, POLLIN);
   312   sqe->user_data = kWakeIndex;
   314   centries_.resize(params.sq_entries);
   316   for (
size_t i = 0; i < centries_.size() - 1; ++i) {
   317     centries_[i].val = i + 1;
   320   thread_id_ = pthread_self();
   321   tl_info_.is_proactor_thread = 
true;
   324 void Proactor::WakeRing() {
   325   DVLOG(1) << 
"Wake ring " << tq_seq_.load(std::memory_order_relaxed);
   327   tq_wakeups_.fetch_add(1, std::memory_order_relaxed);
   330   CHECK_EQ(8, write(wake_fd_, &val, 
sizeof(uint64_t)));
   333 void Proactor::DispatchCompletions(io_uring_cqe* cqes, 
unsigned count) {
   334   for (
unsigned i = 0; i < count; ++i) {
   339     if (cqe.user_data >= kUserDataCbIndex) {  
   340       size_t index = cqe.user_data - kUserDataCbIndex;
   341       DCHECK_LT(index, centries_.size());
   342       auto& e = centries_[index];
   343       DCHECK(e.cb) << index;
   346       auto payload = e.val;
   352       func(cqe.res, payload, 
this);
   356     if (cqe.user_data == kIgnoreIndex)
   359     if (cqe.user_data == kWakeIndex) {
   361       DCHECK_GE(cqe.res, 0);
   363       DVLOG(1) << 
"Wakeup " << cqe.res << 
"/" << cqe.flags;
   365       CHECK_EQ(8, read(wake_fd_, &cqe.user_data, 8));  
   368       struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
   370       io_uring_prep_poll_add(sqe, wake_fd_, POLLIN);  
   371       sqe->user_data = kWakeIndex;
   373       LOG(ERROR) << 
"Unrecognized user_data " << cqe.user_data;
   379   io_uring_sqe* res = io_uring_get_sqe(&ring_);
   381     fibers::context* current = fibers::context::active();
   382     CHECK(current != main_loop_ctx_) << 
"SQE overflow in the main context";
   384     sqe_avail_.
await([
this] { 
return io_uring_sq_space_left(&ring_) > 0; });
   385     res = io_uring_get_sqe(&ring_);  
   390     if (next_free_ < 0) {
   392       DCHECK_GT(next_free_, 0);
   395     res->user_data = next_free_ + kUserDataCbIndex;
   396     DCHECK_LT(next_free_, centries_.size());
   398     auto& e = centries_[next_free_];
   400     DVLOG(1) << 
"GetSubmitEntry: index: " << next_free_ << 
", socket: " << payload;
   403     e.cb = std::move(cb);
   407     res->user_data = kIgnoreIndex;
   413 void Proactor::RegisterSignal(std::initializer_list<uint16_t> l, std::function<
void(
int)> cb) {
   414   auto* state = get_signal_state();
   417   memset(&sa, 0, 
sizeof(sa));
   420     sa.sa_flags = SA_SIGINFO;
   421     sa.sa_sigaction = &SigAction;
   423     for (uint16_t val : l) {
   424       CHECK(!state->signal_map[val].cb) << 
"Signal " << val << 
" was already registered";
   425       state->signal_map[val].cb = cb;
   426       state->signal_map[val].proactor = 
this;
   428       CHECK_EQ(0, sigaction(val, &sa, NULL));
   431     sa.sa_handler = SIG_DFL;
   433     for (uint16_t val : l) {
   434       CHECK(state->signal_map[val].cb) << 
"Signal " << val << 
" was already registered";
   435       state->signal_map[val].cb = 
nullptr;
   436       state->signal_map[val].proactor = 
nullptr;
   438       CHECK_EQ(0, sigaction(val, &sa, NULL));
   443 void Proactor::RegrowCentries() {
   444   size_t prev = centries_.size();
   445   VLOG(1) << 
"RegrowCentries from " << prev << 
" to " << prev * 2;
   447   centries_.resize(prev * 2);  
   449   for (; prev < centries_.size() - 1; ++prev)
   450     centries_[prev].val = prev + 1;
 bool await(Condition condition)
 
void Run(unsigned ring_depth=512, int wq_fd=-1)
 
void AsyncBrief(Func &&brief)
 
SubmitEntry GetSubmitEntry(CbType cb, int64_t payload)
Get the Submit Entry object in order to issue I/O request.
 
void Stop()
Signals proactor to stop. Does not wait for it.