io_context_pool.cc
1 // Copyright 2018, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 
5 #include "util/asio/io_context_pool.h"
6 
7 #include <sched.h>
8 #include <boost/asio/steady_timer.hpp>
9 #include <boost/fiber/mutex.hpp>
10 #include <boost/fiber/scheduler.hpp>
11 
12 #include "base/logging.h"
13 #include "base/pthread_utils.h"
14 
15 using namespace boost;
16 using std::thread;
17 
18 DEFINE_uint32(io_context_threads, 0, "Number of io threads in the pool");
19 
20 namespace util {
21 
22 thread_local size_t IoContextPool::context_indx_ = 0;
23 
24 IoContextPool::IoContextPool(size_t pool_size, std::vector<size_t> cpus) {
25  if (pool_size == 0) {
26  pool_size =
27  FLAGS_io_context_threads > 0 ? FLAGS_io_context_threads : thread::hardware_concurrency();
28  }
29  if (cpus.empty()) {
30  for (size_t i = 0; i < pool_size; ++i)
31  cpus.push_back(i);
32  }
33  CHECK_EQ(pool_size, cpus.size());
34  cpu_idx_arr_ = std::move(cpus);
35  context_arr_.resize(pool_size);
36  thread_arr_.resize(pool_size);
37 }
38 
39 IoContextPool::~IoContextPool() { Stop(); }
40 
41 void IoContextPool::WrapLoop(size_t index, fibers_ext::BlockingCounter* bc) {
42  context_indx_ = index;
43 
44  auto& context = context_arr_[index];
45  VLOG(1) << "Starting io thread " << index;
46 
47  context.StartLoop(bc);
48 
49  VLOG(1) << "Finished io thread " << index;
50 }
51 
52 void IoContextPool::CheckRunningState() {
53  CHECK_EQ(RUN, state_);
54 }
55 
56 void IoContextPool::Run() {
57  CHECK_EQ(STOPPED, state_);
58 
59  fibers_ext::BlockingCounter bc(thread_arr_.size());
60  char buf[32];
61 
62  for (size_t i = 0; i < thread_arr_.size(); ++i) {
63  thread_arr_[i].work.emplace(asio::make_work_guard(*context_arr_[i].context_ptr_));
64  snprintf(buf, sizeof(buf), "IoPool%lu", i);
65  thread_arr_[i].tid =
66  base::StartThread(buf, [this, i, bc]() mutable { this->WrapLoop(i, &bc); });
67  cpu_set_t cps;
68  CPU_ZERO(&cps);
69  CPU_SET(cpu_idx_arr_[i] % thread::hardware_concurrency(), &cps);
70 
71  int rc = pthread_setaffinity_np(thread_arr_[i].tid, sizeof(cpu_set_t), &cps);
72  LOG_IF(WARNING, rc) << "Error calling pthread_setaffinity_np: " << strerror(rc) << "\n";
73  }
74 
75  // We can not use Await() here yet because StartLoop might not run yet and its implementation
76  // assumes internally that the first posted handler is issued from the StartLoop.
77  // Therefore we use BlockingCounter to wait for all the IO loops to start running.
78  bc.Wait();
79 
80  LOG(INFO) << "Running " << thread_arr_.size() << " io threads";
81  state_ = RUN;
82 }
83 
84 void IoContextPool::Stop() {
85  if (state_ == STOPPED)
86  return;
87 
88  for (size_t i = 0; i < context_arr_.size(); ++i) {
89  context_arr_[i].Stop();
90  }
91 
92  for (TInfo& tinfo : thread_arr_) {
93  tinfo.work.reset();
94  }
95  VLOG(1) << "Asio Contexts has been stopped";
96 
97  for (size_t i = 0; i < thread_arr_.size(); ++i) {
98  pthread_join(thread_arr_[i].tid, nullptr);
99  VLOG(2) << "Thread " << i << " has joined";
100  }
101  state_ = STOPPED;
102 }
103 
104 IoContext& IoContextPool::GetNextContext() {
105  // Use a round-robin scheme to choose the next io_context to use.
106  DCHECK_LT(next_io_context_, context_arr_.size());
107  uint32_t index = next_io_context_.load();
108  IoContext& io_context = context_arr_[index++];
109 
110  // Not-perfect round-robind since this function is non-transactional but it's valid.
111  if (index == context_arr_.size())
112  next_io_context_ = 0;
113  else
114  next_io_context_ = index;
115  return io_context;
116 }
117 
118 IoContext* IoContextPool::GetThisContext() {
119  CHECK_EQ(state_, RUN);
120  pthread_t self = pthread_self();
121 
122  for (size_t i = 0; i < thread_arr_.size(); ++i) {
123  if (thread_arr_[i].tid == self) {
124  return &context_arr_[i];
125  }
126  }
127  return nullptr;
128 }
129 
130 } // namespace util