sp_task_pool.h
1 // Copyright 2017, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 #ifndef _UTIL_SP_TASK_POOL_H
5 #define _UTIL_SP_TASK_POOL_H
6 
7 // Deprecated class. Used by pprint_utils.
8 //
9 #include <atomic>
10 #include <memory>
11 #include <mutex>
12 #include <vector>
13 
14 #pragma GCC diagnostic push
15 #pragma GCC diagnostic ignored "-Wsign-compare"
16 #include "base/ProducerConsumerQueue.h"
17 #pragma GCC diagnostic pop
18 
19 #include <pthread.h>
20 
21 #include "base/event_count.h"
22 #include "base/type_traits.h"
23 #include "absl/utility/utility.h" // for absl::apply
24 
25 /*
26  Single producer high performant Task Pool, designed to route work to worker threads.
27  Unlike usualy thread-pool this one has non-trivial life-cycle, allowing setting up local,
28  per thread data for each worker thread and SharedData, accessible by all threads.
29  All this is configured by a single class that encapsulates user logic and life-cycle of the
30  data for that Task.
31 
32  Example:
33  struct MyTask {
34  struct SharedData {
35  mutex* m;
36  Output* o;
37  };
38 
39  void operator(const string& str, SharedData& shared) {
40  // Do something with str.
41  ++i;
42 
43  std::lock_guard<mutex> lock(*shared.m);
44  o->Write(...);
45  }
46 
47  //
48  int i = 0; // Local thread-data, accessible only to this task.
49  };
50 
51  // This TaskPool receives string stream as its input load. every string item is rerouted to
52  // available worker in the pool. The worker can process the string and update shared data.
53  // Private variables of the task are thread-local, can not be shared between tasks.
54  using TaskPool = util::SingleProducerTaskPool<MyTask, string>;
55 
56  // Declare pool with number of workers=O(#cpus), for each worker SPSC queue of size 10.
57  TaskPool pool("pool", 10);
58 
59  pool.SetSharedData(&mutex, &output); // Assign shared data.
60  pool.Launch();
61  ...
62  pool.RunTask(some_string1);
63  pool.RunTask(some_string2);
64  ....
65  pool.WaitForTasksToComplete();
66 
67  // Optionally call Finalize() for tasks that have this function.
68  pool.Finalize()
69 
70 */
71 
72 namespace util {
73 
74 namespace detail {
75 
76 /*
77  Single producer task pool.
78 */
80  public:
81 
82  // Does not take ownership over shared_data.
83  // per_thread_capacity - is queue capacity per each thread.
84  SingleProducerTaskPoolBase(std::string name, unsigned per_thread_capacity,
85  unsigned num_threads = 0);
86  virtual ~SingleProducerTaskPoolBase();
87 
88  // This function blocks until the pool in the state where each thread was in the state of
89  // not having eny tasks to run at least one. It does not guarantee that tasks were added later.
90  // It's for responsibility of the calling thread not to run tasks while waiting on
91  // WaitForTasksToComplete.
92  void WaitForTasksToComplete();
93 
94  unsigned thread_count() const {
95  return thread_count_;
96  }
97 
98  // Returns the currently maximal queue size of all threads.
99  unsigned QueueSize() const;
100 
101 #ifdef DEBUG_ROMAN
102  // Returns average queue delay of this taskpool in micro seconds.
103  uint64 AverageDelayUsec() const;
104 #endif
105 
106  private:
107 
108  struct RoutineConfig;
109  static void* ThreadRoutine(void* config);
110 
111  protected:
112  void LaunchThreads();
113  void JoinThreads();
114 
115  unsigned FindMostFreeThread() const;
116 
117  // We use this Interface in order to separate work pool base code from c++ template wrapping
118  // logic.
120  virtual bool RunTask() = 0;
121  virtual bool IsQueueEmpty() const = 0;
122  virtual unsigned QueueSize() const = 0;
123  virtual ~ThreadLocalInterface();
124  };
125 
126  std::string base_name_;
127  std::atomic_bool start_cancel_;
128  unsigned per_thread_capacity_, thread_count_;
129 
130  struct ThreadInfo {
131  ThreadInfo() {
132  d.has_tasks = false;
133  }
134 
135  void Join();
136 
137  void Wake() {
138  d.ev_non_empty.notify();
139  }
140 
141  struct Data {
142  pthread_t thread_id = 0;
143  folly::EventCount ev_non_empty, ev_task_finished;
144 
145  std::atomic_bool has_tasks;
146  } d;
147 
148  // Eliminate false sharing.
149  char padding[CACHE_LINE_PAD(sizeof(d))];
150  };
151 
152  std::unique_ptr<ThreadInfo[]> thread_info_;
153  std::vector<std::unique_ptr<ThreadLocalInterface>> thread_interfaces_;
154 };
155 
156 GENERATE_TYPE_MEMBER_WITH_DEFAULT(SharedDataOrEmptyTuple, SharedData, std::tuple<>);
157 template <typename T>
158 using SharedDataOrEmptyTuple_t = typename SharedDataOrEmptyTuple<T>::type;
159 
160 } // namespace detail
161 
162 template <typename Task>
164  template <class T>
165  using PCQ = folly::ProducerConsumerQueue<T>;
166  using SharedTuple = detail::SharedDataOrEmptyTuple_t<Task>;
167 
168  // When tasks do not have shared data.
169  static void InitShared(Task& task, const std::tuple<>& t) {}
170 
171  // When tasks do have shared data.
172  template<typename S> static void InitShared(Task& task, S& s,
173  typename std::enable_if<!std::is_same<S, std::tuple<>>::value>::type* = 0) {
174  task.InitShared(s);
175  }
176 
177  using TaskArgs = typename base::DecayedTupleFromParams<Task>::type;
178 
179  struct CallItem {
180  TaskArgs args;
181 
182  template <typename... Args> CallItem(Args&&... a) : args(std::forward<Args>(a)...) {
183  }
184 
185  CallItem() {
186  }
187  };
188 
189  class QueueTaskImpl : public ThreadLocalInterface {
190  PCQ<CallItem> queue_;
191  SharedTuple& shared_data_;
192  Task task_;
193 
194  friend class SingleProducerTaskPool;
195 
196  public:
197  template <typename... Args>
198  QueueTaskImpl(unsigned size, SharedTuple& shared, Args&&... task_args)
199  : queue_(size), shared_data_(shared), task_(std::forward<Args>(task_args)...) {
200  InitShared(task_, shared);
201  }
202 
203  virtual bool RunTask() override {
204  CallItem item;
205  if (!queue_.read(item))
206  return false;
207 
208  // queue_delay_jiffies += (base::GetMonotonicJiffies() - item.ts);
209  // ++queue_delay_count;
210 
211  // In C++17 can be std::apply.
212  // Should it be forward or move? Depends if args are moveable or not.
213  // Ideally we want to move instead of copy.
214  absl::apply(task_, std::move(item.args));
215  return true;
216  }
217 
218  bool IsQueueEmpty() const override {
219  return queue_.isEmpty();
220  };
221  unsigned QueueSize() const override {
222  return queue_.sizeGuess();
223  }
224 
225  void Finalize() {
226  task_.Finalize();
227  }
228  };
229 
230  public:
231  // per_thread_capacity should be greater or equal to 2.
232  SingleProducerTaskPool(const char* name, unsigned per_thread_capacity, unsigned num_threads = 0)
233  : detail::SingleProducerTaskPoolBase(name, per_thread_capacity, num_threads) {
234  }
235 
237  }
238 
239  // Nonblocking routine. Returns true in case it succeeds to add the task to the pool.
240  template <typename... Args>
241  bool TryRunTask(Args&&... args) {
242  unsigned index = FindMostFreeThread();
243 
244  QueueTaskImpl* t = static_cast<QueueTaskImpl*>(thread_interfaces_[index].get());
245 
246  if (t->queue_.write(std::forward<Args>(args)...)) {
247  auto& ti = thread_info_[index];
248  ti.Wake();
249 
250  return true;
251  }
252 
253  return false;
254  }
255 
256  template <typename... Args>
257  void RunTask(Args&&... args) {
258  if (TryRunTask(std::forward<Args>(args)...))
259  return;
260  // We use TaskArgs to allow element by element initialization of the arguments.
261  RunInline(std::forward<Args>(args)...);
262  }
263 
264  template <typename... Args>
265  void RunInline(Args&&... args) {
266  (*calling_thread_task_)(std::forward<Args>(args)...);
267  }
268 
269  template <typename... Args>
270  void SetSharedData(Args&&... args) {
271  shared_data_ = SharedTuple{std::forward<Args>(args)...};
272  }
273 
274  template <typename... Args>
275  void Launch(Args&&... args) {
276  if (calling_thread_task_)
277  return;
278  calling_thread_task_.reset(new Task(std::forward<Args>(args)...));
279  InitShared(*calling_thread_task_, shared_data_);
280 
281  thread_interfaces_.resize(thread_count());
282  for (auto& ti : thread_interfaces_) {
283  ti.reset(new QueueTaskImpl(per_thread_capacity_, shared_data_, std::forward<Args>(args)...));
284  }
285  LaunchThreads();
286  }
287 
288  // Runs sequentially, thread-safe.
289  void Finalize() {
290  if (!calling_thread_task_)
291  return;
292  calling_thread_task_->Finalize();
293  for (auto& ti : thread_interfaces_) {
294  static_cast<QueueTaskImpl*>(ti.get())->Finalize();
295  }
296  }
297 
298  private:
299  std::unique_ptr<Task> calling_thread_task_;
300  SharedTuple shared_data_;
301 };
302 
303 } // namespace util
304 
305 #endif // _UTIL_SP_TASK_POOL_H