proactor_pool.cc
1 // Copyright 2020, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 
5 #include "util/uring/proactor_pool.h"
6 
7 #include "base/logging.h"
8 #include "base/pthread_utils.h"
9 
10 DEFINE_uint32(proactor_threads, 0, "Number of io threads in the pool");
11 DEFINE_bool(proactor_reuse_wq, true, "If true reuses the same work-queue for all io_urings "
12  "in the pool");
13 
14 using namespace std;
15 
16 namespace util {
17 namespace uring {
18 
19 ProactorPool::ProactorPool(std::size_t pool_size) {
20  if (pool_size == 0) {
21  pool_size = FLAGS_proactor_threads > 0 ? FLAGS_proactor_threads
22  : thread::hardware_concurrency();
23  }
24  pool_size_ = pool_size;
25  proactor_.reset(new Proactor[pool_size]);
26 }
27 
28 ProactorPool::~ProactorPool() {
29  Stop();
30 }
31 
32 void ProactorPool::CheckRunningState() {
33  CHECK_EQ(RUN, state_);
34 }
35 
36 void ProactorPool::Run(uint32_t ring_depth) {
37  CHECK_EQ(STOPPED, state_);
38 
39  char buf[32];
40 
41  auto init_proactor = [this, ring_depth, &buf](int i, int wq_fd) mutable {
42  snprintf(buf, sizeof(buf), "Proactor%u", i);
43  auto cb = [ptr = &proactor_[i], ring_depth]() { ptr->Run(ring_depth); };
44  pthread_t tid = base::StartThread(buf, cb);
45  cpu_set_t cps;
46  CPU_ZERO(&cps);
47  CPU_SET(i % thread::hardware_concurrency(), &cps);
48 
49  int rc = pthread_setaffinity_np(tid, sizeof(cpu_set_t), &cps);
50  LOG_IF(WARNING, rc) << "Error calling pthread_setaffinity_np: "
51  << strerror(rc) << "\n";
52  };
53  init_proactor(0, -1);
54  int wq_fd = FLAGS_proactor_reuse_wq ? proactor_[0].ring_fd() : -1;
55 
56  for (size_t i = 1; i < pool_size_; ++i) {
57  init_proactor(i, wq_fd);
58  }
59  state_ = RUN;
60 
61  AwaitOnAll([](unsigned index, Proactor*) {
62  Proactor::SetIndex(index);
63  });
64 
65  LOG(INFO) << "Running " << pool_size_ << " io threads";
66 }
67 
68 void ProactorPool::Stop() {
69  if (state_ == STOPPED)
70  return;
71 
72  for (size_t i = 0; i < pool_size_; ++i) {
73  proactor_[i].Stop();
74  }
75 
76  VLOG(1) << "Proactors have been stopped";
77 
78  for (size_t i = 0; i < pool_size_; ++i) {
79  pthread_join(proactor_[i].thread_id(), nullptr);
80  VLOG(2) << "Thread " << i << " has joined";
81  }
82  state_ = STOPPED;
83 }
84 
85 Proactor* ProactorPool::GetNextProactor() {
86  uint32_t index = next_io_context_.load(std::memory_order_relaxed);
87  // Use a round-robin scheme to choose the next io_context to use.
88  DCHECK_LT(index, pool_size_);
89 
90  Proactor& proactor = at(index++);
91 
92  // Not-perfect round-robind since this function is non-transactional but it "works".
93  if (index >= pool_size_)
94  index = 0;
95 
96  next_io_context_.store(index, std::memory_order_relaxed);
97  return &proactor;
98 }
99 
100 absl::string_view ProactorPool::GetString(absl::string_view source) {
101  if (source.empty()) {
102  return source;
103  }
104 
105  folly::RWSpinLock::ReadHolder rh(str_lock_);
106  auto it = str_set_.find(source);
107  if (it != str_set_.end())
108  return *it;
109  rh.reset();
110 
111  str_lock_.lock();
112  char* str = arena_.Allocate(source.size());
113  memcpy(str, source.data(), source.size());
114 
115  absl::string_view res(str, source.size());
116  str_set_.insert(res);
117  str_lock_.unlock();
118 
119  return res;
120 }
121 
122 } // namespace uring
123 } // namespace util