sp_task_pool.cc
1 // Copyright 2015, .com . All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 
5 #include "util/sp_task_pool.h"
6 
7 #include <thread>
8 
9 #include "base/logging.h"
10 #include "base/pthread_utils.h"
11 #include "base/walltime.h"
12 
13 namespace util {
14 
15 namespace detail {
16 
17 void SingleProducerTaskPoolBase::ThreadInfo::Join() {
18  if (d.thread_id) {
19  pthread_cancel(d.thread_id);
20  PTHREAD_CHECK(join(d.thread_id, nullptr));
21  d.thread_id = 0;
22  }
23 }
24 
27  unsigned thread_index;
28 };
29 
30 SingleProducerTaskPoolBase::ThreadLocalInterface::~ThreadLocalInterface() {}
31 
32 SingleProducerTaskPoolBase::SingleProducerTaskPoolBase(
33  std::string name, unsigned queue_capacity, unsigned int num_threads)
34  : base_name_(std::move(name)), per_thread_capacity_(queue_capacity) {
35  CHECK_GE(queue_capacity, 2);
36 
37  start_cancel_ = false;
38 
39  if (num_threads == 0) {
40  uint32 num_cpus = std::thread::hardware_concurrency();
41  if (num_cpus == 0)
42  num_threads = 2;
43  else
44  num_threads = num_cpus * 2;
45  }
46  VLOG(1) << "TaskPool " << base_name_ << " with " << num_threads << " threads";
47  thread_count_ = num_threads;
48 }
49 
50 SingleProducerTaskPoolBase::~SingleProducerTaskPoolBase() {
51  JoinThreads();
52 }
53 
54 void SingleProducerTaskPoolBase::LaunchThreads() {
55  CHECK(!thread_info_);
56 
57  thread_info_.reset(new ThreadInfo[thread_count_]);
58 
59  char buf[16];
60  for (unsigned i = 0; i < thread_count_; ++i) {
61  snprintf(buf, sizeof(buf), "%s%d", base_name_.c_str(), i);
62 
63  thread_info_[i].d.thread_id = base::StartThread(buf, ThreadRoutine, new RoutineConfig{this, i});
64  }
65 }
66 
67 void SingleProducerTaskPoolBase::JoinThreads() {
68  start_cancel_ = true;
69  for (unsigned i = 0; i < thread_count_; ++i) {
70  auto& t = thread_info_[i];
71  t.Wake();
72  t.Join();
73  }
74 }
75 
76 
77 unsigned SingleProducerTaskPoolBase::FindMostFreeThread() const {
78  // Give each thread a score according to his queue size and if its runnning task.
79  // Rerun thread index with lowest score.
80  uint32 min_score = kuint32max;
81  unsigned index = 0;
82  for (unsigned i = 0; i < thread_count_; ++i) {
83  uint32 score = thread_interfaces_[i]->QueueSize();
84  if (thread_info_[i].d.has_tasks) {
85  ++score;
86  }
87  if (score == 0) {
88  return i; // If found thread with score 0 return this thread index.
89  }
90  if (score < min_score) {
91  index = i;
92  min_score = score;
93  }
94  }
95  return index;
96 }
97 
98 
99 void SingleProducerTaskPoolBase::WaitForTasksToComplete() {
100  // We assuming that producer thread stopped enqueing tasks.
101  for (unsigned i = 0; i < thread_count_; ++i) {
102  const ThreadLocalInterface* tli = thread_interfaces_[i].get();
103  ThreadInfo::Data& d = thread_info_[i].d;
104 
105  d.ev_task_finished.await([tli, &d] { return tli->IsQueueEmpty() && !d.has_tasks; });
106  }
107  VLOG(1) << "WaitForTasksToComplete finished";
108 }
109 
110 unsigned SingleProducerTaskPoolBase::QueueSize() const {
111  unsigned res = 0;
112  for (unsigned i = 0; i < thread_count_; ++i) {
113  res = std::max(res, thread_interfaces_[i]->QueueSize());
114  }
115  return res;
116 }
117 
118 #ifdef DEBUG_ROMAN
119 uint64 SingleProducerTaskPoolBase::AverageDelayUsec() const {
120  uint64 jiffies = 0;
121  uint64 count = 0;
122  for (unsigned i = 0; i < thread_count_; ++i) {
123  const ThreadLocalInterface* tli = thread_interfaces_[i].get();
124  jiffies += tli->queue_delay_jiffies;
125  count += tli->queue_delay_count;
126  }
127 
128  return count ? jiffies * 100 / count : 0;
129 }
130 #endif
131 
132 void* SingleProducerTaskPoolBase::ThreadRoutine(void* arg) {
133  RoutineConfig* config = (RoutineConfig*)arg;
134  SingleProducerTaskPoolBase* me = config->me;
135  ThreadInfo::Data& ti = me->thread_info_[config->thread_index].d;
136 
137  ThreadLocalInterface* thread_interface = me->thread_interfaces_[config->thread_index].get();
138 
139  delete config;
140  config = nullptr;
141  auto await_check = [me, thread_interface]() {
142  return me->start_cancel_ || !thread_interface->IsQueueEmpty();
143  };
144 
145  unsigned num_yields = 0;
146  while (!me->start_cancel_) {
147  ti.has_tasks.store(true, std::memory_order_release);
148  while (thread_interface->RunTask()) {
149  num_yields = 0;
150  }
151  ti.has_tasks.store(false, std::memory_order_release);
152 
153  if (++num_yields > 100) {
154  // We iterate 1000 times before even bother to sleep.
155 
156  if (thread_interface->IsQueueEmpty()) {
157  VLOG(2) << "ti.empty_q_cv.notify";
158 
159  ti.ev_task_finished.notify();
160  ti.ev_non_empty.await(await_check);
161  }
162  }
163  }
164  char buf[30] = {0};
165  pthread_getname_np(pthread_self(), buf, sizeof buf);
166  VLOG(1) << "Finishing running SingleProducerTaskPoolBase thread " << buf;
167 
168  return NULL;
169 }
170 
171 } // namespace detail
172 
173 } // namespace util