io_context_pool.h
1 // Copyright 2018, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 
5 #pragma once
6 
7 #include <thread>
8 #include <vector>
9 
10 #include <absl/types/optional.h>
11 #include <boost/asio/executor_work_guard.hpp>
12 #include <boost/fiber/fiber.hpp>
13 
14 #include "base/type_traits.h"
15 #include "util/asio/io_context.h"
16 #include "util/fibers/fibers_ext.h"
17 
18 namespace util {
19 
27  template <typename Func, typename... Args>
28  using AcceptArgsCheck =
29  typename std::enable_if<base::is_invocable<Func, Args...>::value, int>::type;
30 
31  public:
32  using io_context = ::boost::asio::io_context;
33 
34  IoContextPool(const IoContextPool&) = delete;
35  void operator=(const IoContextPool&) = delete;
36 
39  explicit IoContextPool(std::size_t pool_size = 0, std::vector<size_t> cpus = {});
40 
41  ~IoContextPool();
42 
44  void Run();
45 
51  void Stop();
52 
55 
56  IoContext& operator[](size_t i) { return context_arr_[i]; }
57  IoContext& at(size_t i) { return context_arr_[i]; }
58 
59  size_t size() const { return context_arr_.size(); }
60 
67  template <typename Func, AcceptArgsCheck<Func, IoContext&> = 0> void AsyncOnAll(Func&& func) {
68  CheckRunningState();
69  for (unsigned i = 0; i < size(); ++i) {
70  IoContext& context = context_arr_[i];
71  // func must be copied, it can not be moved, because we dsitribute it into multiple
72  // IoContexts.
73  context.Async([&context, func] () mutable { func(context); });
74  }
75  }
76 
83  template <typename Func, AcceptArgsCheck<Func, unsigned, IoContext&> = 0>
84  void AsyncOnAll(Func&& func) {
85  CheckRunningState();
86  for (unsigned i = 0; i < size(); ++i) {
87  IoContext& context = context_arr_[i];
88  // Copy func on purpose, see above.
89  context.Async([&context, i, func] () mutable { func(i, context); });
90  }
91  }
92 
99  template <typename Func, AcceptArgsCheck<Func, IoContext&> = 0> void AwaitOnAll(Func&& func) {
100  fibers_ext::BlockingCounter bc(size());
101  auto cb = [func = std::forward<Func>(func), bc](IoContext& context) mutable {
102  func(context);
103  bc.Dec();
104  };
105  AsyncOnAll(std::move(cb));
106  bc.Wait();
107  }
108 
114  template <typename Func, AcceptArgsCheck<Func, unsigned, IoContext&> = 0>
115  void AwaitOnAll(Func&& func) {
116  fibers_ext::BlockingCounter bc(size());
117  auto cb = [func = std::forward<Func>(func), bc](unsigned index, IoContext& context) mutable {
118  func(index, context);
119  bc.Dec();
120  };
121  AsyncOnAll(std::move(cb));
122  bc.Wait();
123  }
124 
133  template <typename Func, AcceptArgsCheck<Func, unsigned, IoContext&> = 0> void AsyncFiberOnAll(Func&& func) {
134  AsyncOnAll([func = std::forward<Func>(func)](unsigned i, IoContext& context) {
135  ::boost::fibers::fiber(func, i, std::ref(context)).detach();
136  });
137  }
138 
147  template <typename Func, AcceptArgsCheck<Func, IoContext&> = 0> void AsyncFiberOnAll(Func&& func) {
148  AsyncOnAll([func = std::forward<Func>(func)](IoContext& context) {
149  ::boost::fibers::fiber(func, std::ref(context)).detach();
150  });
151  }
152 
161  template <typename Func, AcceptArgsCheck<Func, unsigned, IoContext&> = 0>
162  void AwaitFiberOnAll(Func&& func) {
163  fibers_ext::BlockingCounter bc(size());
164  auto cb = [func = std::forward<Func>(func), bc](unsigned i, IoContext& context) mutable {
165  func(i, context);
166  bc.Dec();
167  };
168  AsyncFiberOnAll(std::move(cb));
169  bc.Wait();
170  }
171 
180  template <typename Func, AcceptArgsCheck<Func, IoContext&> = 0> void AwaitFiberOnAll(Func&& func) {
181  fibers_ext::BlockingCounter bc(size());
182  auto cb = [func = std::forward<Func>(func), bc](IoContext& context) mutable {
183  func(context);
184  bc.Dec();
185  };
186  AsyncFiberOnAll(std::move(cb));
187  bc.Wait();
188  }
189 
198  template <typename Func> void AwaitFiberOnAllSerially(Func&& func) {
199  boost::fibers::mutex mu; // Enforces only one thread.
200  AwaitFiberOnAll([&mu,func](auto&& args) { // We copy func on purpose, see ebove.
201  std::lock_guard<boost::fibers::mutex> lk(mu);
202  func(std::forward<decltype(args)>(args));
203  });
204  }
205 
206  IoContext* GetThisContext();
207 
208  private:
209  void WrapLoop(size_t index, fibers_ext::BlockingCounter* bc);
210  void CheckRunningState();
211 
212  typedef ::boost::asio::executor_work_guard<IoContext::io_context::executor_type> work_guard_t;
213 
214  std::vector<size_t> cpu_idx_arr_;
215  std::vector<IoContext> context_arr_;
216  struct TInfo {
217  pthread_t tid = 0;
218  absl::optional<work_guard_t> work;
219  };
220 
221  std::vector<TInfo> thread_arr_;
222 
224  std::atomic_uint_fast32_t next_io_context_{0};
225  thread_local static size_t context_indx_;
226  enum State { STOPPED, RUN } state_ = STOPPED;
227 };
228 
229 } // namespace util
void AsyncFiberOnAll(Func &&func)
Runs func in a fiber asynchronously. func must accept IoContext&. func may fiber-block.
void AwaitOnAll(Func &&func)
Runs the funcion in all IO threads asynchronously. Blocks until all the asynchronous calls return.
void AwaitFiberOnAll(Func &&func)
Runs func wrapped in fiber on all IO threads in parallel. func must accept IoContext&....
void Stop()
Stops all io_context objects in the pool.
IoContext & GetNextContext()
Get an io_context to use. Thread-safe.
void AwaitFiberOnAllSerially(Func &&func)
Runs func wrapped in fiber on all IO threads, SERIALLY. func must accept IoContext&....
void Run()
Starts running all IoContext objects in the pool. Does not block.
void AsyncOnAll(Func &&func)
Runs func in all IO threads asynchronously.
A pool of IoContext objects, representing and managing CPU resources of the system.