10 #include <boost/fiber/fiber.hpp> 13 #include "base/function2.hpp" 14 #include "base/mpmc_bounded_queue.h" 15 #include "util/fibers/event_count.h" 16 #include "util/fibers/fibers_ext.h" 17 #include "util/uring/submit_entry.h" 27 void operator=(
const Proactor&) =
delete;
34 void Run(
unsigned ring_depth = 512,
int wq_fd = -1);
43 using CbType = std::function<void(IoResult, int64_t,
Proactor*)>;
70 return pthread_self() == thread_id_;
73 auto thread_id()
const {
77 static bool IsProactorThread() {
78 return tl_info_.is_proactor_thread;
83 static uint64_t GetMonotonicTimeNs() {
84 return tl_info_.monotonic_time;
89 static int32_t GetIndex() {
90 return tl_info_.proactor_index;
94 static void SetIndex(uint32_t index) {
95 tl_info_.proactor_index = index;
99 bool HasFastPoll()
const {
107 template <
typename Func>
void AsyncBrief(Func&& brief);
113 template <
typename Func>
auto AwaitBrief(Func&& brief) -> decltype(brief());
117 template <
typename Func,
typename... Args>
void AsyncFiber(Func&& f, Args&&... args) {
121 AsyncBrief([f = std::forward<Func>(f), args...]()
mutable {
122 ::boost::fibers::fiber(std::forward<Func>(f), std::forward<Args>(args)...).detach();
128 template <
typename... Args> boost::fibers::fiber LaunchFiber(Args&&... args) {
129 ::boost::fibers::fiber fb;
132 AwaitBrief([&] { fb = boost::fibers::fiber(std::forward<Args>(args)...); });
141 template <
typename Func>
auto AwaitBlocking(Func&& f) -> decltype(f());
143 void RegisterSignal(std::initializer_list<uint16_t> l, std::function<
void(
int)> cb);
145 void ClearSignal(std::initializer_list<uint16_t> l) {
146 RegisterSignal(l,
nullptr);
149 int ring_fd()
const {
return ring_.ring_fd;}
152 enum { WAIT_SECTION_STATE = 1UL << 31 };
154 void Init(
size_t ring_size,
int wq_fd = -1);
158 void WakeupIfNeeded() {
159 auto current = tq_seq_.fetch_add(2, std::memory_order_relaxed);
160 if (current == WAIT_SECTION_STATE) {
165 void DispatchCompletions(io_uring_cqe* cqes,
unsigned count);
167 template <
typename Func>
bool EmplaceTaskQueue(Func&& f) {
168 if (task_queue_.try_enqueue(std::forward<Func>(f))) {
176 void RegrowCentries();
179 pthread_t thread_id_ = 0U;
182 bool is_stopped_ =
true;
183 uint8_t fast_poll_f_ : 1;
184 uint8_t reseved_f_ : 7;
188 fu2::function_base<
true ,
false , fu2::capacity_default,
189 false ,
false , void()>;
190 static_assert(
sizeof(Tasklet) == 32,
"");
192 using FuncQ = base::mpmc_bounded_queue<Tasklet>;
194 using EventCount = fibers_ext::EventCount;
197 std::atomic_uint32_t tq_seq_{0}, tq_wakeups_{0};
198 EventCount task_queue_avail_, sqe_avail_;
199 ::boost::fibers::context* main_loop_ctx_ =
nullptr;
201 friend class UringFiberAlgo;
203 struct CompletionEntry {
211 static_assert(
sizeof(CompletionEntry) == 40,
"");
213 std::vector<CompletionEntry> centries_;
214 int32_t next_free_ = -1;
217 bool is_proactor_thread =
false;
218 uint32_t proactor_index = 0;
219 uint64_t monotonic_time = 0;
221 static thread_local TLInfo tl_info_;
230 if (EmplaceTaskQueue(std::forward<Func>(f)))
236 if (EmplaceTaskQueue(std::forward<Func>(f))) {
239 task_queue_avail_.wait(key.epoch());
247 if (IsProactorThread()) {
252 using ResultType = decltype(f());
256 AsyncBrief([&mover, f = std::forward<Func>(f), done]()
mutable {
262 return std::move(mover).get();
270 template <
typename Func>
auto Proactor::AwaitBlocking(Func&& f) -> decltype(f()) {
275 using ResultType = decltype(f());
277 auto fb = LaunchFiber([&] { mover.Apply(std::forward<Func>(f)); });
280 return std::move(mover).get();
void Run(unsigned ring_depth=512, int wq_fd=-1)
void AsyncBrief(Func &&brief)
SubmitEntry GetSubmitEntry(CbType cb, int64_t payload)
Get the Submit Entry object in order to issue I/O request.
auto AwaitBrief(Func &&brief) -> decltype(brief())
Similarly to AsyncBrief but waits 'f' to return.
void Stop()
Signals proactor to stop. Does not wait for it.
void AsyncFiber(Func &&f, Args &&... args)
bool InMyThread() const
Returns true if the called is running in this Proactor thread.