5 #include "util/uring/uring_fiber_algo.h" 7 #include "base/logging.h" 8 #include "util/uring/proactor.h" 14 using namespace boost;
17 UringFiberAlgo::UringFiberAlgo(Proactor* proactor) : proactor_(proactor) {
18 main_cntx_ = fibers::context::active();
19 CHECK(main_cntx_->is_context(fibers::type::main_context));
22 UringFiberAlgo::~UringFiberAlgo() {
25 void UringFiberAlgo::awakened(FiberContext* ctx, UringFiberProps& props) noexcept {
26 DCHECK(!ctx->ready_is_linked());
28 if (ctx->is_context(fibers::type::dispatcher_context)) {
29 DVLOG(2) <<
"Awakened dispatch";
31 DVLOG(2) <<
"Awakened " << props.name();
36 ctx->ready_link(rqueue_);
39 auto UringFiberAlgo::pick_next() noexcept -> FiberContext* {
40 DVLOG(2) <<
"pick_next: " << ready_cnt_ <<
"/" << rqueue_.size();
45 FiberContext* ctx = &rqueue_.front();
48 if (!ctx->is_context(boost::fibers::type::dispatcher_context)) {
50 UringFiberProps* props = (UringFiberProps*)ctx->get_properties();
51 DVLOG(1) <<
"Switching to " << props->name();
53 DVLOG(1) <<
"Switching to dispatch";
58 void UringFiberAlgo::property_change(FiberContext* ctx, UringFiberProps& props) noexcept {
59 if (!ctx->ready_is_linked()) {
65 if (!ctx->is_context(fibers::type::dispatcher_context)) {
75 bool UringFiberAlgo::has_ready_fibers() const noexcept {
76 return ready_cnt_ > 0;
81 void UringFiberAlgo::suspend_until(
const time_point& abs_time) noexcept {
82 auto* cur_cntx = fibers::context::active();
84 DCHECK(cur_cntx->is_context(fibers::type::dispatcher_context));
85 if (time_point::max() != abs_time) {
86 auto cb = [](Proactor::IoResult res, int64_t, Proactor*) {
90 DCHECK_NE(res, -EINVAL) <<
"This linux version does not support this operation";
91 DVLOG(1) <<
"this_fiber::yield " << res;
110 SubmitEntry se = proactor_->GetSubmitEntry(std::move(cb), 0);
111 using namespace chrono;
112 constexpr uint64_t kNsFreq = 1000000000ULL;
114 const chrono::time_point<steady_clock, nanoseconds>& tp = abs_time;
115 uint64_t ns = time_point_cast<nanoseconds>(tp).time_since_epoch().count();
116 ts_.tv_sec = ns / kNsFreq;
117 ts_.tv_nsec = ns - ts_.tv_sec * kNsFreq;
121 se.PrepTimeout(&ts_);
125 main_cntx_->get_scheduler()->schedule(main_cntx_);
131 void UringFiberAlgo::notify() noexcept {
132 DVLOG(1) <<
"notify from " << syscall(SYS_gettid);
138 auto prev_val = proactor_->tq_seq_.fetch_or(1, std::memory_order_relaxed);
139 if (prev_val == Proactor::WAIT_SECTION_STATE) {
140 proactor_->WakeRing();