5 #include "util/sp_task_pool.h" 9 #include "base/logging.h" 10 #include "base/pthread_utils.h" 11 #include "base/walltime.h" 17 void SingleProducerTaskPoolBase::ThreadInfo::Join() {
19 pthread_cancel(d.thread_id);
20 PTHREAD_CHECK(join(d.thread_id,
nullptr));
27 unsigned thread_index;
30 SingleProducerTaskPoolBase::ThreadLocalInterface::~ThreadLocalInterface() {}
32 SingleProducerTaskPoolBase::SingleProducerTaskPoolBase(
33 std::string name,
unsigned queue_capacity,
unsigned int num_threads)
34 : base_name_(std::move(name)), per_thread_capacity_(queue_capacity) {
35 CHECK_GE(queue_capacity, 2);
37 start_cancel_ =
false;
39 if (num_threads == 0) {
40 uint32 num_cpus = std::thread::hardware_concurrency();
44 num_threads = num_cpus * 2;
46 VLOG(1) <<
"TaskPool " << base_name_ <<
" with " << num_threads <<
" threads";
47 thread_count_ = num_threads;
50 SingleProducerTaskPoolBase::~SingleProducerTaskPoolBase() {
54 void SingleProducerTaskPoolBase::LaunchThreads() {
57 thread_info_.reset(
new ThreadInfo[thread_count_]);
60 for (
unsigned i = 0; i < thread_count_; ++i) {
61 snprintf(buf,
sizeof(buf),
"%s%d", base_name_.c_str(), i);
63 thread_info_[i].d.thread_id = base::StartThread(buf, ThreadRoutine,
new RoutineConfig{
this, i});
67 void SingleProducerTaskPoolBase::JoinThreads() {
69 for (
unsigned i = 0; i < thread_count_; ++i) {
70 auto& t = thread_info_[i];
77 unsigned SingleProducerTaskPoolBase::FindMostFreeThread()
const {
80 uint32 min_score = kuint32max;
82 for (
unsigned i = 0; i < thread_count_; ++i) {
83 uint32 score = thread_interfaces_[i]->QueueSize();
84 if (thread_info_[i].d.has_tasks) {
90 if (score < min_score) {
99 void SingleProducerTaskPoolBase::WaitForTasksToComplete() {
101 for (
unsigned i = 0; i < thread_count_; ++i) {
102 const ThreadLocalInterface* tli = thread_interfaces_[i].get();
103 ThreadInfo::Data& d = thread_info_[i].d;
105 d.ev_task_finished.await([tli, &d] {
return tli->IsQueueEmpty() && !d.has_tasks; });
107 VLOG(1) <<
"WaitForTasksToComplete finished";
110 unsigned SingleProducerTaskPoolBase::QueueSize()
const {
112 for (
unsigned i = 0; i < thread_count_; ++i) {
113 res = std::max(res, thread_interfaces_[i]->QueueSize());
119 uint64 SingleProducerTaskPoolBase::AverageDelayUsec()
const {
122 for (
unsigned i = 0; i < thread_count_; ++i) {
123 const ThreadLocalInterface* tli = thread_interfaces_[i].get();
124 jiffies += tli->queue_delay_jiffies;
125 count += tli->queue_delay_count;
128 return count ? jiffies * 100 / count : 0;
132 void* SingleProducerTaskPoolBase::ThreadRoutine(
void* arg) {
133 RoutineConfig* config = (RoutineConfig*)arg;
134 SingleProducerTaskPoolBase* me = config->me;
135 ThreadInfo::Data& ti = me->thread_info_[config->thread_index].d;
137 ThreadLocalInterface* thread_interface = me->thread_interfaces_[config->thread_index].get();
141 auto await_check = [me, thread_interface]() {
142 return me->start_cancel_ || !thread_interface->IsQueueEmpty();
145 unsigned num_yields = 0;
146 while (!me->start_cancel_) {
147 ti.has_tasks.store(
true, std::memory_order_release);
148 while (thread_interface->RunTask()) {
151 ti.has_tasks.store(
false, std::memory_order_release);
153 if (++num_yields > 100) {
156 if (thread_interface->IsQueueEmpty()) {
157 VLOG(2) <<
"ti.empty_q_cv.notify";
159 ti.ev_task_finished.notify();
160 ti.ev_non_empty.await(await_check);
165 pthread_getname_np(pthread_self(), buf,
sizeof buf);
166 VLOG(1) <<
"Finishing running SingleProducerTaskPoolBase thread " << buf;