proactor_pool.h
1 // Copyright 2020, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 
5 #pragma once
6 
7 #include <pthread.h>
8 
9 #include <vector>
10 
11 #include "absl/container/flat_hash_set.h"
12 #include "absl/strings/string_view.h"
13 #include "base/arena.h"
14 #include "base/type_traits.h"
15 #include "base/RWSpinLock.h"
16 #include "util/uring/proactor.h"
17 
18 namespace util {
19 namespace uring {
20 
21 class ProactorPool {
22  template <typename Func, typename... Args>
23  using AcceptArgsCheck =
24  typename std::enable_if<base::is_invocable<Func, Args...>::value,
25  int>::type;
26 
27  public:
28  ProactorPool(const ProactorPool&) = delete;
29  void operator=(const ProactorPool&) = delete;
30 
34  explicit ProactorPool(std::size_t pool_size = 0);
35 
36  ~ProactorPool();
37 
40  void Run(uint32_t ring_depth = 256);
41 
47  void Stop();
48 
51 
52  Proactor& operator[](size_t i) {
53  return at(i);
54  }
55 
56  Proactor& at(size_t i) {
57  return proactor_[i];
58  }
59 
60  size_t size() const {
61  return pool_size_;
62  }
63 
70  template <typename Func, AcceptArgsCheck<Func, Proactor*> = 0>
71  void AsyncOnAll(Func&& func) {
72  CheckRunningState();
73  for (unsigned i = 0; i < size(); ++i) {
74  Proactor& context = proactor_[i];
75  // func must be copied, it can not be moved, because we dsitribute it into
76  // multiple Proactors.
77  context.AsyncBrief([&context, func]() mutable { func(&context); });
78  }
79  }
80 
88  template <typename Func, AcceptArgsCheck<Func, unsigned, Proactor*> = 0>
89  void AsyncOnAll(Func&& func) {
90  CheckRunningState();
91  for (unsigned i = 0; i < size(); ++i) {
92  Proactor& context = proactor_[i];
93  // Copy func on purpose, see above.
94  context.AsyncBrief([&context, i, func]() mutable { func(i, &context); });
95  }
96  }
97 
104  template <typename Func, AcceptArgsCheck<Func, Proactor*> = 0>
105  void AwaitOnAll(Func&& func) {
106  fibers_ext::BlockingCounter bc(size());
107  auto cb = [func = std::forward<Func>(func), bc](Proactor* context) mutable {
108  func(context);
109  bc.Dec();
110  };
111  AsyncOnAll(std::move(cb));
112  bc.Wait();
113  }
114 
120  template <typename Func, AcceptArgsCheck<Func, unsigned, Proactor*> = 0>
121  void AwaitOnAll(Func&& func) {
122  fibers_ext::BlockingCounter bc(size());
123  auto cb = [func = std::forward<Func>(func), bc](unsigned index,
124  Proactor* context) mutable {
125  func(index, context);
126  bc.Dec();
127  };
128  AsyncOnAll(std::move(cb));
129  bc.Wait();
130  }
131 
140  template <typename Func, AcceptArgsCheck<Func, unsigned, Proactor*> = 0>
141  void AsyncFiberOnAll(Func&& func) {
142  AsyncOnAll(
143  [func = std::forward<Func>(func)](unsigned i, Proactor* context) {
144  ::boost::fibers::fiber(func, i, context).detach();
145  });
146  }
147 
156  template <typename Func, AcceptArgsCheck<Func, Proactor*> = 0>
157  void AsyncFiberOnAll(Func&& func) {
158  AsyncOnAll([func = std::forward<Func>(func)](Proactor* context) {
159  ::boost::fibers::fiber(func, context).detach();
160  });
161  }
162 
171  template <typename Func, AcceptArgsCheck<Func, unsigned, Proactor*> = 0>
172  void AwaitFiberOnAll(Func&& func) {
173  fibers_ext::BlockingCounter bc(size());
174  auto cb = [func = std::forward<Func>(func), bc](unsigned i,
175  Proactor* context) mutable {
176  func(i, context);
177  bc.Dec();
178  };
179  AsyncFiberOnAll(std::move(cb));
180  bc.Wait();
181  }
182 
191  template <typename Func, AcceptArgsCheck<Func, Proactor*> = 0>
192  void AwaitFiberOnAll(Func&& func) {
193  fibers_ext::BlockingCounter bc(size());
194  auto cb = [func = std::forward<Func>(func), bc](Proactor* context) mutable {
195  func(context);
196  bc.Dec();
197  };
198  AsyncFiberOnAll(std::move(cb));
199  bc.Wait();
200  }
201 
202  Proactor* GetLocalProactor();
203 
204  // Auxillary functions
205 
206  // Returns a string owned by pool's global storage. Allocates only once for each new string blob.
207  // Currently has average performance and it employs RW spinlock underneath.
208  absl::string_view GetString(absl::string_view source);
209 
210  private:
211  void WrapLoop(size_t index, fibers_ext::BlockingCounter* bc);
212  void CheckRunningState();
213 
214  std::unique_ptr<Proactor[]> proactor_;
215 
217  std::atomic_uint_fast32_t next_io_context_{0};
218  uint32_t pool_size_;
219 
220  folly::RWSpinLock str_lock_;
221  absl::flat_hash_set<absl::string_view> str_set_;
222  base::Arena arena_;
223 
224  enum State { STOPPED, RUN } state_ = STOPPED;
225 };
226 
227 constexpr size_t foo = sizeof(ProactorPool);
228 
229 } // namespace uring
230 } // namespace util
void AwaitOnAll(Func &&func)
Runs the funcion in all IO threads asynchronously. Blocks until all the asynchronous calls return.
void AsyncFiberOnAll(Func &&func)
Runs func in a fiber asynchronously. func must accept Proactor&. func may fiber-block.
void Stop()
Stops all io_context objects in the pool.
Proactor * GetNextProactor()
Get a Proactor to use. Thread-safe.
void AwaitFiberOnAll(Func &&func)
Runs func wrapped in fiber on all IO threads in parallel. func must accept Proactor&....
void AsyncBrief(Func &&brief)
Definition: proactor.h:229
void Run(uint32_t ring_depth=256)
void AsyncOnAll(Func &&func)
Runs func in all IO threads asynchronously.
Definition: proactor_pool.h:71