5 #include "util/uring/proactor.h" 9 #include <sys/eventfd.h> 12 #include <boost/fiber/operations.hpp> 13 #include <boost/fiber/scheduler.hpp> 15 #include "absl/base/attributes.h" 16 #include "base/logging.h" 17 #include "base/macros.h" 18 #include "util/uring/uring_fiber_algo.h" 20 #define URING_CHECK(x) \ 22 int __res_val = (x); \ 23 if (UNLIKELY(__res_val < 0)) { \ 25 char* str = strerror_r(-__res_val, buf, sizeof(buf)); \ 26 LOG(FATAL) << "Error " << (-__res_val) << " evaluating '" #x "': " << str; \ 30 #ifndef __NR_io_uring_enter 31 #define __NR_io_uring_enter 426 34 using namespace boost;
35 namespace ctx = boost::context;
42 inline int sys_io_uring_enter(
int fd,
unsigned to_submit,
unsigned min_complete,
unsigned flags,
44 return syscall(__NR_io_uring_enter, fd, to_submit, min_complete, flags, sig, _NSIG / 8);
47 ABSL_ATTRIBUTE_NOINLINE
int wait_for_cqe(io_uring* ring, sigset_t* sig = NULL) {
49 int res = sys_io_uring_enter(ring->ring_fd, 0, 1, IORING_ENTER_GETEVENTS, sig);
50 if (res == 0 || errno == EINTR)
55 LOG(FATAL) <<
"Error " << (res) <<
" evaluating sys_io_uring_enter: " << strerror(res);
61 Proactor* proactor =
nullptr;
62 std::function<void(
int)> cb;
65 Item signal_map[_NSIG];
68 signal_state* get_signal_state() {
69 static signal_state state;
74 void SigAction(
int signal, siginfo_t*,
void*) {
75 signal_state* state = get_signal_state();
76 DCHECK_LT(signal, _NSIG);
78 auto& item = state->signal_map[signal];
79 auto cb = [signal, &item] { item.cb(signal); };
81 if (item.proactor && item.cb) {
82 item.proactor->AsyncFiber(std::move(cb));
84 LOG(ERROR) <<
"Tangling signal handler " << signal;
88 inline uint64_t GetClockNanos() {
90 clock_gettime(CLOCK_MONOTONIC, &ts);
91 return ts.tv_sec * std::nano::den + ts.tv_nsec;
94 inline unsigned CQReadyCount(
const io_uring& ring) {
95 return io_uring_smp_load_acquire(ring.cq.ktail) - *ring.cq.khead;
98 unsigned IoRingPeek(
const io_uring& ring, io_uring_cqe* cqes,
unsigned count) {
99 unsigned ready = CQReadyCount(ring);
103 count = count > ready ? ready : count;
104 unsigned head = *ring.cq.khead;
105 unsigned mask = *ring.cq.kring_mask;
106 unsigned last = head + count;
107 for (
int i = 0; head != last; head++, i++) {
108 cqes[i] = ring.cq.cqes[head & mask];
113 constexpr uint64_t kIgnoreIndex = 0;
114 constexpr uint64_t kWakeIndex = 1;
115 constexpr uint64_t kUserDataCbIndex = 1024;
116 constexpr uint32_t kSpinLimit = 200;
120 thread_local Proactor::TLInfo Proactor::tl_info_;
122 Proactor::Proactor() : task_queue_(128) {
123 wake_fd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
124 CHECK_GT(wake_fd_, 0);
126 volatile ctx::fiber dummy;
130 Proactor::~Proactor() {
132 if (thread_id_ != -1U) {
133 io_uring_queue_exit(&ring_);
137 signal_state* ss = get_signal_state();
138 for (
size_t i = 0; i < arraysize(ss->signal_map); ++i) {
139 if (ss->signal_map[i].proactor ==
this) {
140 ss->signal_map[i].proactor =
nullptr;
141 ss->signal_map[i].cb =
nullptr;
148 VLOG(1) <<
"Proactor::StopFinish";
152 VLOG(1) <<
"Proactor::Run";
153 Init(ring_depth, wq_fd);
155 main_loop_ctx_ = fibers::context::active();
156 fibers::scheduler* sched = main_loop_ctx_->get_scheduler();
159 sched->set_algo(scheduler);
160 this_fiber::properties<UringFiberProps>().set_name(
"ioloop");
164 constexpr
size_t kBatchSize = 64;
165 struct io_uring_cqe cqes[kBatchSize];
167 uint32_t num_stalls = 0;
168 uint32_t spin_loops = 0, num_task_runs = 0;
172 int num_submitted = io_uring_submit(&ring_);
174 if (num_submitted >= 0) {
176 DVLOG(2) <<
"Submitted " << num_submitted;
177 }
else if (num_submitted == -EBUSY) {
178 VLOG(1) <<
"EBUSY " << num_submitted;
181 URING_CHECK(num_submitted);
186 uint64_t task_start = 0;
188 tq_seq = tq_seq_.load(std::memory_order_acquire);
192 while (task_queue_.try_dequeue(task)) {
194 tl_info_.monotonic_time = GetClockNanos();
196 if (task_start == 0) {
197 task_start = tl_info_.monotonic_time;
198 }
else if (task_start + 500000 < tl_info_.monotonic_time) {
205 task_queue_avail_.notifyAll();
208 uint32_t cqe_count = IoRingPeek(ring_, cqes, kBatchSize);
211 io_uring_cq_advance(&ring_, cqe_count);
212 DVLOG(2) <<
"Fetched " << cqe_count <<
" cqes";
213 DispatchCompletions(cqes, cqe_count);
214 sqe_avail_.notifyAll();
218 tq_seq_.fetch_and(~1, std::memory_order_relaxed);
222 if (sched->has_ready_fibers()) {
226 DVLOG(2) <<
"Suspend ioloop";
227 tl_info_.monotonic_time = GetClockNanos();
230 DVLOG(2) <<
"Resume ioloop";
234 if (cqe_count || io_uring_sq_ready(&ring_))
238 if (++spin_loops < kSpinLimit) {
252 if (tq_seq_.compare_exchange_weak(tq_seq, WAIT_SECTION_STATE, std::memory_order_acquire)) {
255 DVLOG(1) <<
"wait_for_cqe";
256 int res = wait_for_cqe(&ring_);
257 DVLOG(1) <<
"Woke up " << res <<
"/" << tq_seq_.load(std::memory_order_acquire);
263 tq_seq_.fetch_and(1, std::memory_order_release);
267 VLOG(1) <<
"wakeups/stalls: " << tq_wakeups_.load() <<
"/" << num_stalls;
269 VLOG(1) <<
"centries size: " << centries_.size();
273 void Proactor::Init(
size_t ring_size,
int wq_fd) {
274 CHECK_EQ(0, ring_size & (ring_size - 1));
275 CHECK_GE(ring_size, 8);
276 CHECK_EQ(0, thread_id_) <<
"Init was already called";
278 io_uring_params params;
279 memset(¶ms, 0,
sizeof(params));
283 params.flags |= IORING_SETUP_ATTACH_WQ;
284 params.wq_fd = wq_fd;
291 URING_CHECK(io_uring_queue_init_params(ring_size, &ring_, ¶ms));
292 fast_poll_f_ = (params.features & IORING_FEAT_FAST_POLL) != 0;
294 LOG_FIRST_N(INFO, 1) <<
"IORING_FEAT_FAST_POLL feature is not present in the kernel";
297 if (0 == (params.features & IORING_FEAT_NODROP)) {
298 LOG_FIRST_N(INFO, 1) <<
"IORING_FEAT_NODROP feature is not present in the kernel";
301 if (params.features & IORING_FEAT_SINGLE_MMAP) {
302 size_t sz = ring_.sq.ring_sz + params.sq_entries *
sizeof(
struct io_uring_sqe);
303 LOG_FIRST_N(INFO, 1) <<
"IORing with " << params.sq_entries <<
" allocated " << sz
304 <<
" bytes, cq_entries is " << *ring_.cq.kring_entries;
306 CHECK_EQ(ring_size, params.sq_entries);
308 struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
311 io_uring_prep_poll_add(sqe, wake_fd_, POLLIN);
312 sqe->user_data = kWakeIndex;
314 centries_.resize(params.sq_entries);
316 for (
size_t i = 0; i < centries_.size() - 1; ++i) {
317 centries_[i].val = i + 1;
320 thread_id_ = pthread_self();
321 tl_info_.is_proactor_thread =
true;
324 void Proactor::WakeRing() {
325 DVLOG(1) <<
"Wake ring " << tq_seq_.load(std::memory_order_relaxed);
327 tq_wakeups_.fetch_add(1, std::memory_order_relaxed);
330 CHECK_EQ(8, write(wake_fd_, &val,
sizeof(uint64_t)));
333 void Proactor::DispatchCompletions(io_uring_cqe* cqes,
unsigned count) {
334 for (
unsigned i = 0; i < count; ++i) {
339 if (cqe.user_data >= kUserDataCbIndex) {
340 size_t index = cqe.user_data - kUserDataCbIndex;
341 DCHECK_LT(index, centries_.size());
342 auto& e = centries_[index];
343 DCHECK(e.cb) << index;
346 auto payload = e.val;
352 func(cqe.res, payload,
this);
356 if (cqe.user_data == kIgnoreIndex)
359 if (cqe.user_data == kWakeIndex) {
361 DCHECK_GE(cqe.res, 0);
363 DVLOG(1) <<
"Wakeup " << cqe.res <<
"/" << cqe.flags;
365 CHECK_EQ(8, read(wake_fd_, &cqe.user_data, 8));
368 struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
370 io_uring_prep_poll_add(sqe, wake_fd_, POLLIN);
371 sqe->user_data = kWakeIndex;
373 LOG(ERROR) <<
"Unrecognized user_data " << cqe.user_data;
379 io_uring_sqe* res = io_uring_get_sqe(&ring_);
381 fibers::context* current = fibers::context::active();
382 CHECK(current != main_loop_ctx_) <<
"SQE overflow in the main context";
384 sqe_avail_.
await([
this] {
return io_uring_sq_space_left(&ring_) > 0; });
385 res = io_uring_get_sqe(&ring_);
390 if (next_free_ < 0) {
392 DCHECK_GT(next_free_, 0);
395 res->user_data = next_free_ + kUserDataCbIndex;
396 DCHECK_LT(next_free_, centries_.size());
398 auto& e = centries_[next_free_];
400 DVLOG(1) <<
"GetSubmitEntry: index: " << next_free_ <<
", socket: " << payload;
403 e.cb = std::move(cb);
407 res->user_data = kIgnoreIndex;
413 void Proactor::RegisterSignal(std::initializer_list<uint16_t> l, std::function<
void(
int)> cb) {
414 auto* state = get_signal_state();
417 memset(&sa, 0,
sizeof(sa));
420 sa.sa_flags = SA_SIGINFO;
421 sa.sa_sigaction = &SigAction;
423 for (uint16_t val : l) {
424 CHECK(!state->signal_map[val].cb) <<
"Signal " << val <<
" was already registered";
425 state->signal_map[val].cb = cb;
426 state->signal_map[val].proactor =
this;
428 CHECK_EQ(0, sigaction(val, &sa, NULL));
431 sa.sa_handler = SIG_DFL;
433 for (uint16_t val : l) {
434 CHECK(state->signal_map[val].cb) <<
"Signal " << val <<
" was already registered";
435 state->signal_map[val].cb =
nullptr;
436 state->signal_map[val].proactor =
nullptr;
438 CHECK_EQ(0, sigaction(val, &sa, NULL));
443 void Proactor::RegrowCentries() {
444 size_t prev = centries_.size();
445 VLOG(1) <<
"RegrowCentries from " << prev <<
" to " << prev * 2;
447 centries_.resize(prev * 2);
449 for (; prev < centries_.size() - 1; ++prev)
450 centries_[prev].val = prev + 1;
bool await(Condition condition)
void Run(unsigned ring_depth=512, int wq_fd=-1)
void AsyncBrief(Func &&brief)
SubmitEntry GetSubmitEntry(CbType cb, int64_t payload)
Get the Submit Entry object in order to issue I/O request.
void Stop()
Signals proactor to stop. Does not wait for it.