4 #ifndef _UTIL_SP_TASK_POOL_H 5 #define _UTIL_SP_TASK_POOL_H 14 #pragma GCC diagnostic push 15 #pragma GCC diagnostic ignored "-Wsign-compare" 16 #include "base/ProducerConsumerQueue.h" 17 #pragma GCC diagnostic pop 21 #include "base/event_count.h" 22 #include "base/type_traits.h" 23 #include "absl/utility/utility.h" 85 unsigned num_threads = 0);
92 void WaitForTasksToComplete();
94 unsigned thread_count()
const {
99 unsigned QueueSize()
const;
103 uint64 AverageDelayUsec()
const;
109 static void* ThreadRoutine(
void* config);
112 void LaunchThreads();
115 unsigned FindMostFreeThread()
const;
120 virtual bool RunTask() = 0;
121 virtual bool IsQueueEmpty()
const = 0;
122 virtual unsigned QueueSize()
const = 0;
126 std::string base_name_;
127 std::atomic_bool start_cancel_;
128 unsigned per_thread_capacity_, thread_count_;
138 d.ev_non_empty.notify();
142 pthread_t thread_id = 0;
143 folly::EventCount ev_non_empty, ev_task_finished;
145 std::atomic_bool has_tasks;
149 char padding[CACHE_LINE_PAD(
sizeof(d))];
152 std::unique_ptr<ThreadInfo[]> thread_info_;
153 std::vector<std::unique_ptr<ThreadLocalInterface>> thread_interfaces_;
156 GENERATE_TYPE_MEMBER_WITH_DEFAULT(SharedDataOrEmptyTuple, SharedData, std::tuple<>);
157 template <
typename T>
158 using SharedDataOrEmptyTuple_t =
typename SharedDataOrEmptyTuple<T>::type;
162 template <
typename Task>
165 using PCQ = folly::ProducerConsumerQueue<T>;
166 using SharedTuple = detail::SharedDataOrEmptyTuple_t<Task>;
169 static void InitShared(Task& task,
const std::tuple<>& t) {}
172 template<
typename S>
static void InitShared(Task& task, S& s,
173 typename std::enable_if<!std::is_same<S, std::tuple<>>::value>::type* = 0) {
177 using TaskArgs =
typename base::DecayedTupleFromParams<Task>::type;
182 template <
typename... Args> CallItem(Args&&... a) : args(std::forward<Args>(a)...) {
190 PCQ<CallItem> queue_;
191 SharedTuple& shared_data_;
197 template <
typename... Args>
198 QueueTaskImpl(
unsigned size, SharedTuple& shared, Args&&... task_args)
199 : queue_(size), shared_data_(shared), task_(std::forward<Args>(task_args)...) {
200 InitShared(task_, shared);
203 virtual bool RunTask()
override {
205 if (!queue_.read(item))
214 absl::apply(task_, std::move(item.args));
218 bool IsQueueEmpty()
const override {
219 return queue_.isEmpty();
221 unsigned QueueSize()
const override {
222 return queue_.sizeGuess();
240 template <
typename... Args>
241 bool TryRunTask(Args&&... args) {
242 unsigned index = FindMostFreeThread();
244 QueueTaskImpl* t = static_cast<QueueTaskImpl*>(thread_interfaces_[index].get());
246 if (t->queue_.write(std::forward<Args>(args)...)) {
247 auto& ti = thread_info_[index];
256 template <
typename... Args>
257 void RunTask(Args&&... args) {
258 if (TryRunTask(std::forward<Args>(args)...))
261 RunInline(std::forward<Args>(args)...);
264 template <
typename... Args>
265 void RunInline(Args&&... args) {
266 (*calling_thread_task_)(std::forward<Args>(args)...);
269 template <
typename... Args>
270 void SetSharedData(Args&&... args) {
271 shared_data_ = SharedTuple{std::forward<Args>(args)...};
274 template <
typename... Args>
275 void Launch(Args&&... args) {
276 if (calling_thread_task_)
278 calling_thread_task_.reset(
new Task(std::forward<Args>(args)...));
279 InitShared(*calling_thread_task_, shared_data_);
281 thread_interfaces_.resize(thread_count());
282 for (
auto& ti : thread_interfaces_) {
283 ti.reset(
new QueueTaskImpl(per_thread_capacity_, shared_data_, std::forward<Args>(args)...));
290 if (!calling_thread_task_)
292 calling_thread_task_->Finalize();
293 for (
auto& ti : thread_interfaces_) {
294 static_cast<QueueTaskImpl*>(ti.get())->Finalize();
299 std::unique_ptr<Task> calling_thread_task_;
300 SharedTuple shared_data_;
305 #endif // _UTIL_SP_TASK_POOL_H