4 #include "util/fibers/fiberqueue_threadpool.h" 6 #include "absl/strings/str_cat.h" 7 #include "base/pthread_utils.h" 10 namespace fibers_ext {
11 using namespace boost;
14 FiberQueue::FiberQueue(
unsigned queue_size) : queue_(queue_size) {
17 void FiberQueue::Run() {
18 bool is_closed =
false;
22 if (queue_.try_dequeue(func)) {
27 if (is_closed_.load(std::memory_order_acquire)) {
41 }
catch (std::exception& e) {
43 LOG(FATAL) <<
"Exception " << e.what();
49 is_closed_.store(
true, memory_order_seq_cst);
53 FiberQueueThreadPool::FiberQueueThreadPool(
unsigned num_threads,
unsigned queue_size) {
54 if (num_threads == 0) {
55 num_threads = std::thread::hardware_concurrency();
57 worker_size_ = num_threads;
58 workers_.reset(
new Worker[num_threads]);
60 for (
unsigned i = 0; i < num_threads; ++i) {
61 string name = absl::StrCat(
"fq_pool", i);
63 auto fn = std::bind(&FiberQueueThreadPool::WorkerFunction,
this, i);
64 workers_[i].q.reset(
new FiberQueue(queue_size));
65 workers_[i].tid = base::StartThread(name.c_str(), fn);
69 FiberQueueThreadPool::~FiberQueueThreadPool() {
70 VLOG(1) <<
"FiberQueueThreadPool::~FiberQueueThreadPool";
75 void FiberQueueThreadPool::Shutdown() {
79 for (
size_t i = 0; i < worker_size_; ++i) {
80 workers_[i].q->is_closed_.store(
true, memory_order_seq_cst);
81 workers_[i].q->pull_ec_.notifyAll();
84 for (
size_t i = 0; i < worker_size_; ++i) {
85 auto& w = workers_[i];
86 pthread_join(w.tid,
nullptr);
90 VLOG(1) <<
"FiberQueueThreadPool::ShutdownEnd";
93 void FiberQueueThreadPool::WorkerFunction(
unsigned index) {
102 workers_[index].q->Run();
104 VLOG(1) <<
"FiberQueueThreadPool::Exit";
bool await(Condition condition)
void Shutdown()
Notifies Run() function to empty the queue and to exit. Does not block.