fiberqueue_threadpool.cc
1 // Copyright 2019, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 #include "util/fibers/fiberqueue_threadpool.h"
5 
6 #include "absl/strings/str_cat.h"
7 #include "base/pthread_utils.h"
8 
9 namespace util {
10 namespace fibers_ext {
11 using namespace boost;
12 using namespace std;
13 
14 FiberQueue::FiberQueue(unsigned queue_size) : queue_(queue_size) {
15 }
16 
17 void FiberQueue::Run() {
18  bool is_closed = false;
19  CbFunc func;
20 
21  auto cb = [&] {
22  if (queue_.try_dequeue(func)) {
23  push_ec_.notify();
24  return true;
25  }
26 
27  if (is_closed_.load(std::memory_order_acquire)) {
28  is_closed = true;
29  return true;
30  }
31  return false;
32  };
33 
34  while (true) {
35  pull_ec_.await(cb);
36 
37  if (is_closed)
38  break;
39  try {
40  func();
41  } catch (std::exception& e) {
42  // std::exception_ptr p = std::current_exception();
43  LOG(FATAL) << "Exception " << e.what();
44  }
45  }
46 }
47 
49  is_closed_.store(true, memory_order_seq_cst);
50  pull_ec_.notify();
51 }
52 
53 FiberQueueThreadPool::FiberQueueThreadPool(unsigned num_threads, unsigned queue_size) {
54  if (num_threads == 0) {
55  num_threads = std::thread::hardware_concurrency();
56  }
57  worker_size_ = num_threads;
58  workers_.reset(new Worker[num_threads]);
59 
60  for (unsigned i = 0; i < num_threads; ++i) {
61  string name = absl::StrCat("fq_pool", i);
62 
63  auto fn = std::bind(&FiberQueueThreadPool::WorkerFunction, this, i);
64  workers_[i].q.reset(new FiberQueue(queue_size));
65  workers_[i].tid = base::StartThread(name.c_str(), fn);
66  }
67 }
68 
69 FiberQueueThreadPool::~FiberQueueThreadPool() {
70  VLOG(1) << "FiberQueueThreadPool::~FiberQueueThreadPool";
71 
72  Shutdown();
73 }
74 
75 void FiberQueueThreadPool::Shutdown() {
76  if (!workers_)
77  return;
78 
79  for (size_t i = 0; i < worker_size_; ++i) {
80  workers_[i].q->is_closed_.store(true, memory_order_seq_cst);
81  workers_[i].q->pull_ec_.notifyAll();
82  }
83 
84  for (size_t i = 0; i < worker_size_; ++i) {
85  auto& w = workers_[i];
86  pthread_join(w.tid, nullptr);
87  }
88 
89  workers_.reset();
90  VLOG(1) << "FiberQueueThreadPool::ShutdownEnd";
91 }
92 
93 void FiberQueueThreadPool::WorkerFunction(unsigned index) {
94  /*
95  sched_param param;
96  param.sched_priority = 1;
97  int err = pthread_setschedparam(pthread_self(), SCHED_FIFO, &param);
98  if (err) {
99  LOG(INFO) << "Could not set FIFO priority in fiber-queue-thread";
100  }*/
101 
102  workers_[index].q->Run();
103 
104  VLOG(1) << "FiberQueueThreadPool::Exit";
105 }
106 
107 } // namespace fibers_ext
108 } // namespace util
bool await(Condition condition)
Definition: event_count.h:178
void Shutdown()
Notifies Run() function to empty the queue and to exit. Does not block.