6 #include "base/mpmc_bounded_queue.h" 7 #include "util/fibers/fibers_ext.h" 10 namespace fibers_ext {
12 class FiberQueueThreadPool;
24 explicit FiberQueue(
unsigned queue_size = 128);
27 template <
typename F>
bool TryAdd(F&& f) {
28 if (queue_.try_enqueue(std::forward<F>(f))) {
42 template <
typename F>
bool Add(F&& f) {
43 if (TryAdd(std::forward<F>(f))) {
51 if (TryAdd(std::forward<F>(f))) {
55 push_ec_.wait(key.epoch());
67 template <
typename F>
auto Await(F&& f) -> decltype(f()) {
69 using ResultType = decltype(f());
72 Add([&mover, f = std::forward<F>(f), done]()
mutable {
78 return std::move(mover).get();
90 typedef std::function<void()> CbFunc;
93 using FuncQ = base::mpmc_bounded_queue<CbFunc>;
97 std::atomic_bool is_closed_{
false};
106 template <
typename F>
auto Await(F&& f) -> decltype(f()) {
108 using ResultType = decltype(f());
111 Add([&, f = std::forward<F>(f), done]()
mutable {
117 return std::move(mover).get();
120 template <
typename F>
auto Await(
size_t worker_index, F&& f) -> decltype(f()) {
122 using ResultType = decltype(f());
125 Add(worker_index, [&, f = std::forward<F>(f), done]()
mutable {
131 return std::move(mover).get();
134 template <
typename F>
void Add(F&& f) {
135 size_t start = next_index_.fetch_add(1, std::memory_order_relaxed) % worker_size_;
136 Worker& main_w = workers_[start];
139 if (AddAnyWorker(start, std::forward<F>(f))) {
143 main_w.q->push_ec_.wait(key.epoch());
156 template <
typename F>
bool Add(
size_t index, F&& f) {
157 return workers_[index % worker_size_].q->Add(std::forward<F>(f));
160 FiberQueue* GetQueue(
size_t index) {
return workers_[index % worker_size_].q.get();}
165 size_t wrapped_idx(
size_t i) {
return i < worker_size_ ? i : i - worker_size_; }
167 template <
typename F>
bool AddAnyWorker(
size_t start, F&& f) {
168 for (
size_t i = 0; i < worker_size_; ++i) {
169 auto& w = workers_[wrapped_idx(start + i)];
170 if (w.q->TryAdd(std::forward<F>(f))) {
177 void WorkerFunction(
unsigned index);
181 std::unique_ptr<FiberQueue> q;
184 std::unique_ptr<Worker[]> workers_;
187 std::atomic_ulong next_index_{0};
void Shutdown()
Notifies Run() function to empty the queue and to exit. Does not block.
MPSC task-queue that is handled by a single consumer loop.
bool Add(size_t index, F &&f)
Runs f on a worker pinned by "index". index does not have to be in range.
bool Add(F &&f)
Submits a callback into the queue. Should not be called after calling Shutdown().
auto Await(F &&f) -> decltype(f())
Sends f to consumer thread and waits for it to finish runnning.