5 #include "util/uring/proactor_pool.h" 7 #include "base/logging.h" 8 #include "base/pthread_utils.h" 10 DEFINE_uint32(proactor_threads, 0,
"Number of io threads in the pool");
11 DEFINE_bool(proactor_reuse_wq,
true,
"If true reuses the same work-queue for all io_urings " 19 ProactorPool::ProactorPool(std::size_t pool_size) {
21 pool_size = FLAGS_proactor_threads > 0 ? FLAGS_proactor_threads
22 : thread::hardware_concurrency();
24 pool_size_ = pool_size;
25 proactor_.reset(
new Proactor[pool_size]);
28 ProactorPool::~ProactorPool() {
32 void ProactorPool::CheckRunningState() {
33 CHECK_EQ(RUN, state_);
36 void ProactorPool::Run(uint32_t ring_depth) {
37 CHECK_EQ(STOPPED, state_);
41 auto init_proactor = [
this, ring_depth, &buf](
int i,
int wq_fd)
mutable {
42 snprintf(buf,
sizeof(buf),
"Proactor%u", i);
43 auto cb = [ptr = &proactor_[i], ring_depth]() { ptr->Run(ring_depth); };
44 pthread_t tid = base::StartThread(buf, cb);
47 CPU_SET(i % thread::hardware_concurrency(), &cps);
49 int rc = pthread_setaffinity_np(tid,
sizeof(cpu_set_t), &cps);
50 LOG_IF(WARNING, rc) <<
"Error calling pthread_setaffinity_np: " 51 << strerror(rc) <<
"\n";
54 int wq_fd = FLAGS_proactor_reuse_wq ? proactor_[0].ring_fd() : -1;
56 for (
size_t i = 1; i < pool_size_; ++i) {
57 init_proactor(i, wq_fd);
61 AwaitOnAll([](
unsigned index,
Proactor*) {
62 Proactor::SetIndex(index);
65 LOG(INFO) <<
"Running " << pool_size_ <<
" io threads";
68 void ProactorPool::Stop() {
69 if (state_ == STOPPED)
72 for (
size_t i = 0; i < pool_size_; ++i) {
76 VLOG(1) <<
"Proactors have been stopped";
78 for (
size_t i = 0; i < pool_size_; ++i) {
79 pthread_join(proactor_[i].thread_id(),
nullptr);
80 VLOG(2) <<
"Thread " << i <<
" has joined";
86 uint32_t index = next_io_context_.load(std::memory_order_relaxed);
88 DCHECK_LT(index, pool_size_);
93 if (index >= pool_size_)
96 next_io_context_.store(index, std::memory_order_relaxed);
100 absl::string_view ProactorPool::GetString(absl::string_view source) {
101 if (source.empty()) {
105 folly::RWSpinLock::ReadHolder rh(str_lock_);
106 auto it = str_set_.find(source);
107 if (it != str_set_.end())
112 char* str = arena_.Allocate(source.size());
113 memcpy(str, source.data(), source.size());
115 absl::string_view res(str, source.size());
116 str_set_.insert(res);