fiberqueue_threadpool.h
1 // Copyright 2019, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 #pragma once
5 
6 #include "base/mpmc_bounded_queue.h"
7 #include "util/fibers/fibers_ext.h"
8 
9 namespace util {
10 namespace fibers_ext {
11 
12 class FiberQueueThreadPool;
13 
14 
20 class FiberQueue {
21  friend class FiberQueueThreadPool;
22 
23  public:
24  explicit FiberQueue(unsigned queue_size = 128);
25  FiberQueue();
26 
27  template <typename F> bool TryAdd(F&& f) {
28  if (queue_.try_enqueue(std::forward<F>(f))) {
29  pull_ec_.notify();
30  return true;
31  }
32  return false;
33  }
34 
42  template <typename F> bool Add(F&& f) {
43  if (TryAdd(std::forward<F>(f))) {
44  return false;
45  }
46 
47  bool result = false;
48  while (true) {
49  EventCount::Key key = push_ec_.prepareWait();
50 
51  if (TryAdd(std::forward<F>(f))) {
52  break;
53  }
54  result = true;
55  push_ec_.wait(key.epoch());
56  }
57  return result;
58  }
59 
67  template <typename F> auto Await(F&& f) -> decltype(f()) {
68  Done done;
69  using ResultType = decltype(f());
71 
72  Add([&mover, f = std::forward<F>(f), done]() mutable {
73  mover.Apply(f);
74  done.Notify();
75  });
76 
77  done.Wait();
78  return std::move(mover).get();
79  }
80 
85  void Shutdown();
86 
87  void Run();
88 
89  private:
90  typedef std::function<void()> CbFunc;
91 
92 
93  using FuncQ = base::mpmc_bounded_queue<CbFunc>;
94  FuncQ queue_;
95 
96  EventCount push_ec_, pull_ec_;
97  std::atomic_bool is_closed_{false};
98 };
99 
100 // This thread pool has a global fiber-friendly queue for incoming tasks.
102  public:
103  explicit FiberQueueThreadPool(unsigned num_threads = 0, unsigned queue_size = 128);
105 
106  template <typename F> auto Await(F&& f) -> decltype(f()) {
107  Done done;
108  using ResultType = decltype(f());
110 
111  Add([&, f = std::forward<F>(f), done]() mutable {
112  mover.Apply(f);
113  done.Notify();
114  });
115 
116  done.Wait();
117  return std::move(mover).get();
118  }
119 
120  template <typename F> auto Await(size_t worker_index, F&& f) -> decltype(f()) {
121  Done done;
122  using ResultType = decltype(f());
124 
125  Add(worker_index, [&, f = std::forward<F>(f), done]() mutable {
126  mover.Apply(f);
127  done.Notify();
128  });
129 
130  done.Wait();
131  return std::move(mover).get();
132  }
133 
134  template <typename F> void Add(F&& f) {
135  size_t start = next_index_.fetch_add(1, std::memory_order_relaxed) % worker_size_;
136  Worker& main_w = workers_[start];
137  while (true) {
138  EventCount::Key key = main_w.q->push_ec_.prepareWait();
139  if (AddAnyWorker(start, std::forward<F>(f))) {
140  break;
141  }
142 
143  main_w.q->push_ec_.wait(key.epoch());
144  }
145  }
146 
147 
156  template <typename F> bool Add(size_t index, F&& f) {
157  return workers_[index % worker_size_].q->Add(std::forward<F>(f));
158  }
159 
160  FiberQueue* GetQueue(size_t index) { return workers_[index % worker_size_].q.get();}
161 
162  void Shutdown();
163 
164  private:
165  size_t wrapped_idx(size_t i) { return i < worker_size_ ? i : i - worker_size_; }
166 
167  template <typename F> bool AddAnyWorker(size_t start, F&& f) {
168  for (size_t i = 0; i < worker_size_; ++i) {
169  auto& w = workers_[wrapped_idx(start + i)];
170  if (w.q->TryAdd(std::forward<F>(f))) {
171  return true;
172  }
173  }
174  return false;
175  }
176 
177  void WorkerFunction(unsigned index);
178 
179  struct Worker {
180  pthread_t tid;
181  std::unique_ptr<FiberQueue> q;
182  };
183 
184  std::unique_ptr<Worker[]> workers_;
185  size_t worker_size_;
186 
187  std::atomic_ulong next_index_{0};
188 };
189 
190 } // namespace fibers_ext
191 } // namespace util
void Shutdown()
Notifies Run() function to empty the queue and to exit. Does not block.
MPSC task-queue that is handled by a single consumer loop.
bool Add(size_t index, F &&f)
Runs f on a worker pinned by "index". index does not have to be in range.
bool Add(F &&f)
Submits a callback into the queue. Should not be called after calling Shutdown().
auto Await(F &&f) -> decltype(f())
Sends f to consumer thread and waits for it to finish runnning.