5 #include "util/asio/io_context_pool.h" 8 #include <boost/asio/steady_timer.hpp> 9 #include <boost/fiber/mutex.hpp> 10 #include <boost/fiber/scheduler.hpp> 12 #include "base/logging.h" 13 #include "base/pthread_utils.h" 15 using namespace boost;
18 DEFINE_uint32(io_context_threads, 0,
"Number of io threads in the pool");
22 thread_local
size_t IoContextPool::context_indx_ = 0;
24 IoContextPool::IoContextPool(
size_t pool_size, std::vector<size_t> cpus) {
27 FLAGS_io_context_threads > 0 ? FLAGS_io_context_threads : thread::hardware_concurrency();
30 for (
size_t i = 0; i < pool_size; ++i)
33 CHECK_EQ(pool_size, cpus.size());
34 cpu_idx_arr_ = std::move(cpus);
35 context_arr_.resize(pool_size);
36 thread_arr_.resize(pool_size);
39 IoContextPool::~IoContextPool() { Stop(); }
41 void IoContextPool::WrapLoop(
size_t index, fibers_ext::BlockingCounter* bc) {
42 context_indx_ = index;
44 auto& context = context_arr_[index];
45 VLOG(1) <<
"Starting io thread " << index;
47 context.StartLoop(bc);
49 VLOG(1) <<
"Finished io thread " << index;
52 void IoContextPool::CheckRunningState() {
53 CHECK_EQ(RUN, state_);
56 void IoContextPool::Run() {
57 CHECK_EQ(STOPPED, state_);
62 for (
size_t i = 0; i < thread_arr_.size(); ++i) {
63 thread_arr_[i].work.emplace(asio::make_work_guard(*context_arr_[i].context_ptr_));
64 snprintf(buf,
sizeof(buf),
"IoPool%lu", i);
66 base::StartThread(buf, [
this, i, bc]()
mutable { this->WrapLoop(i, &bc); });
69 CPU_SET(cpu_idx_arr_[i] % thread::hardware_concurrency(), &cps);
71 int rc = pthread_setaffinity_np(thread_arr_[i].tid,
sizeof(cpu_set_t), &cps);
72 LOG_IF(WARNING, rc) <<
"Error calling pthread_setaffinity_np: " << strerror(rc) <<
"\n";
80 LOG(INFO) <<
"Running " << thread_arr_.size() <<
" io threads";
84 void IoContextPool::Stop() {
85 if (state_ == STOPPED)
88 for (
size_t i = 0; i < context_arr_.size(); ++i) {
89 context_arr_[i].Stop();
92 for (TInfo& tinfo : thread_arr_) {
95 VLOG(1) <<
"Asio Contexts has been stopped";
97 for (
size_t i = 0; i < thread_arr_.size(); ++i) {
98 pthread_join(thread_arr_[i].tid,
nullptr);
99 VLOG(2) <<
"Thread " << i <<
" has joined";
106 DCHECK_LT(next_io_context_, context_arr_.size());
107 uint32_t index = next_io_context_.load();
108 IoContext& io_context = context_arr_[index++];
111 if (index == context_arr_.size())
112 next_io_context_ = 0;
114 next_io_context_ = index;
118 IoContext* IoContextPool::GetThisContext() {
119 CHECK_EQ(state_, RUN);
120 pthread_t
self = pthread_self();
122 for (
size_t i = 0; i < thread_arr_.size(); ++i) {
123 if (thread_arr_[i].tid ==
self) {
124 return &context_arr_[i];