proactor.h
1 // Copyright 2020, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 
5 #pragma once
6 
7 #include <liburing.h>
8 #include <pthread.h>
9 
10 #include <boost/fiber/fiber.hpp>
11 #include <functional>
12 
13 #include "base/function2.hpp"
14 #include "base/mpmc_bounded_queue.h"
15 #include "util/fibers/event_count.h"
16 #include "util/fibers/fibers_ext.h"
17 #include "util/uring/submit_entry.h"
18 
19 namespace util {
20 namespace uring {
21 
22 class UringFiberAlgo;
23 class ProactorPool;
24 
25 class Proactor {
26  Proactor(const Proactor&) = delete;
27  void operator=(const Proactor&) = delete;
28 
29  public:
30  Proactor();
31  ~Proactor();
32 
33  // Runs the poll-loop. Stalls the calling thread which will become the "Proactor" thread.
34  void Run(unsigned ring_depth = 512, int wq_fd = -1);
35 
37  void Stop();
38 
39  using IoResult = int;
40 
41  // IoResult is the I/O result of the completion event.
42  // int64_t is the payload supplied during event submission. See GetSubmitEntry below.
43  using CbType = std::function<void(IoResult, int64_t, Proactor*)>;
44 
61  SubmitEntry GetSubmitEntry(CbType cb, int64_t payload);
62 
69  bool InMyThread() const {
70  return pthread_self() == thread_id_;
71  }
72 
73  auto thread_id() const {
74  return thread_id_;
75  }
76 
77  static bool IsProactorThread() {
78  return tl_info_.is_proactor_thread;
79  }
80 
81  // Returns an approximate (cached) time with nano-sec granularity.
82  // The caller must run in the same thread as the proactor.
83  static uint64_t GetMonotonicTimeNs() {
84  return tl_info_.monotonic_time;
85  }
86 
87  // Returns an 0 <= index < N, where N is the number of proactor threads in the pool of called
88  // from Proactor thread. Returns -1 if called from some other thread.
89  static int32_t GetIndex() {
90  return tl_info_.proactor_index;
91  }
92 
93  // Internal, used by ProactorPool
94  static void SetIndex(uint32_t index) {
95  tl_info_.proactor_index = index;
96  }
97 
98 
99  bool HasFastPoll() const {
100  return fast_poll_f_;
101  }
102 
107  template <typename Func> void AsyncBrief(Func&& brief);
111 
113  template <typename Func> auto AwaitBrief(Func&& brief) -> decltype(brief());
114 
117  template <typename Func, typename... Args> void AsyncFiber(Func&& f, Args&&... args) {
118  // Ideally we want to forward args into lambda but it's too complicated before C++20.
119  // So I just copy them into capture.
120  // We forward captured variables so we need lambda to be mutable.
121  AsyncBrief([f = std::forward<Func>(f), args...]() mutable {
122  ::boost::fibers::fiber(std::forward<Func>(f), std::forward<Args>(args)...).detach();
123  });
124  }
125 
126  // Please note that this function uses Await, therefore can not be used inside
127  // Proactor main fiber (i.e. Async callbacks).
128  template <typename... Args> boost::fibers::fiber LaunchFiber(Args&&... args) {
129  ::boost::fibers::fiber fb;
130 
131  // It's safe to use & capture since we await before returning.
132  AwaitBrief([&] { fb = boost::fibers::fiber(std::forward<Args>(args)...); });
133  return fb;
134  }
135 
136  // Runs possibly awating function 'f' safely in Proactor thread and waits for it to finish,
137  // If we are in his thread already, runs 'f' directly, otherwise
138  // runs it wrapped in a fiber. Should be used instead of 'AwaitBrief' when 'f' itself
139  // awaits on something.
140  // To summarize: 'f' may not block its thread, but allowed to block its fiber.
141  template <typename Func> auto AwaitBlocking(Func&& f) -> decltype(f());
142 
143  void RegisterSignal(std::initializer_list<uint16_t> l, std::function<void(int)> cb);
144 
145  void ClearSignal(std::initializer_list<uint16_t> l) {
146  RegisterSignal(l, nullptr);
147  }
148 
149  int ring_fd() const { return ring_.ring_fd;}
150 
151  private:
152  enum { WAIT_SECTION_STATE = 1UL << 31 };
153 
154  void Init(size_t ring_size, int wq_fd = -1);
155 
156  void WakeRing();
157 
158  void WakeupIfNeeded() {
159  auto current = tq_seq_.fetch_add(2, std::memory_order_relaxed);
160  if (current == WAIT_SECTION_STATE) {
161  WakeRing();
162  }
163  }
164 
165  void DispatchCompletions(io_uring_cqe* cqes, unsigned count);
166 
167  template <typename Func> bool EmplaceTaskQueue(Func&& f) {
168  if (task_queue_.try_enqueue(std::forward<Func>(f))) {
169  WakeupIfNeeded();
170 
171  return true;
172  }
173  return false;
174  }
175 
176  void RegrowCentries();
177 
178  io_uring ring_;
179  pthread_t thread_id_ = 0U;
180 
181  int wake_fd_;
182  bool is_stopped_ = true;
183  uint8_t fast_poll_f_ : 1;
184  uint8_t reseved_f_ : 7;
185 
186  // We use fu2 function to allow moveable semantics.
187  using Tasklet =
188  fu2::function_base<true /*owns*/, false /*non-copyable*/, fu2::capacity_default,
189  false /* non-throwing*/, false /* strong exceptions guarantees*/, void()>;
190  static_assert(sizeof(Tasklet) == 32, "");
191 
192  using FuncQ = base::mpmc_bounded_queue<Tasklet>;
193 
194  using EventCount = fibers_ext::EventCount;
195 
196  FuncQ task_queue_;
197  std::atomic_uint32_t tq_seq_{0}, tq_wakeups_{0};
198  EventCount task_queue_avail_, sqe_avail_;
199  ::boost::fibers::context* main_loop_ctx_ = nullptr;
200 
201  friend class UringFiberAlgo;
202 
203  struct CompletionEntry {
204  CbType cb;
205 
206  // serves for linked list management when unused. Also can store an additional payload
207  // field when in flight.
208  int32_t val = -1;
209  int32_t opcode = -1; // For debugging. TODO: to remove later.
210  };
211  static_assert(sizeof(CompletionEntry) == 40, "");
212 
213  std::vector<CompletionEntry> centries_;
214  int32_t next_free_ = -1;
215 
216  struct TLInfo {
217  bool is_proactor_thread = false;
218  uint32_t proactor_index = 0;
219  uint64_t monotonic_time = 0;
220  };
221  static thread_local TLInfo tl_info_;
222 };
223 
224 
225 
226 // Implementation
227 // **********************************************************************
228 //
229 template <typename Func> void Proactor::AsyncBrief(Func&& f) {
230  if (EmplaceTaskQueue(std::forward<Func>(f)))
231  return;
232 
233  while (true) {
234  EventCount::Key key = task_queue_avail_.prepareWait();
235 
236  if (EmplaceTaskQueue(std::forward<Func>(f))) {
237  break;
238  }
239  task_queue_avail_.wait(key.epoch());
240  }
241 }
242 
243 template <typename Func> auto Proactor::AwaitBrief(Func&& f) -> decltype(f()) {
244  if (InMyThread()) {
245  return f();
246  }
247  if (IsProactorThread()) {
248  // TODO:
249  }
250 
251  fibers_ext::Done done;
252  using ResultType = decltype(f());
254 
255  // Store done-ptr by value to increase the refcount while lambda is running.
256  AsyncBrief([&mover, f = std::forward<Func>(f), done]() mutable {
257  mover.Apply(f);
258  done.Notify();
259  });
260 
261  done.Wait();
262  return std::move(mover).get();
263 }
264 
265 // Runs possibly awating function 'f' safely in ContextThread and waits for it to finish,
266 // If we are in the context thread already, runs 'f' directly, otherwise
267 // runs it wrapped in a fiber. Should be used instead of 'Await' when 'f' itself
268 // awaits on something.
269 // To summarize: 'f' should not block its thread, but allowed to block its fiber.
270 template <typename Func> auto Proactor::AwaitBlocking(Func&& f) -> decltype(f()) {
271  if (InMyThread()) {
272  return f();
273  }
274 
275  using ResultType = decltype(f());
277  auto fb = LaunchFiber([&] { mover.Apply(std::forward<Func>(f)); });
278  fb.join();
279 
280  return std::move(mover).get();
281 }
282 
283 } // namespace uring
284 } // namespace util
void Run(unsigned ring_depth=512, int wq_fd=-1)
Definition: proactor.cc:151
void AsyncBrief(Func &&brief)
Definition: proactor.h:229
SubmitEntry GetSubmitEntry(CbType cb, int64_t payload)
Get the Submit Entry object in order to issue I/O request.
Definition: proactor.cc:378
auto AwaitBrief(Func &&brief) -> decltype(brief())
Similarly to AsyncBrief but waits 'f' to return.
Definition: proactor.h:243
void Stop()
Signals proactor to stop. Does not wait for it.
Definition: proactor.cc:146
void AsyncFiber(Func &&f, Args &&... args)
Definition: proactor.h:117
bool InMyThread() const
Returns true if the called is running in this Proactor thread.
Definition: proactor.h:69