io_context.h
1 // Copyright 2018, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 
5 #pragma once
6 
7 #include <boost/asio/io_context.hpp>
8 #include <boost/asio/post.hpp>
9 
10 #include <thread>
11 
12 #include "util/fibers/fibers_ext.h"
13 
14 namespace util {
15 
16 class IoFiberPropertiesMgr;
17 
18 class IoFiberProperties : public boost::fibers::fiber_properties {
19  friend class IoFiberPropertiesMgr;
20 
21  public:
22  constexpr static unsigned MAX_NICE_LEVEL = 4;
23  constexpr static unsigned NUM_NICE_LEVELS = MAX_NICE_LEVEL + 1;
24 
25  IoFiberProperties(::boost::fibers::context* ctx) : fiber_properties(ctx), nice_(2) {}
26 
27  unsigned nice_level() const { return nice_; }
28 
29  // Call this method to alter nice, because we must notify
30  // nice_scheduler of any change.
31  // Currently supported levels are 0-MAX_NICE_LEVEL.
32  // 0 - has the highest responsiveness and MAX_NICE_LEVEL has the least.
33  // Values higher than MAX_NICE_LEVEL will be set to MAX_NICE_LEVEL.
34  void SetNiceLevel(unsigned p);
35 
36  void set_name(std::string nm) { name_ = std::move(nm); }
37 
38  const std::string& name() const { return name_; }
39 
42  uint64_t resume_ts() const { return resume_ts_; }
43 
44  uint64_t awaken_ts() const { return awaken_ts_; }
45 
46  private:
47  std::string name_;
48  uint64_t resume_ts_ = 0, awaken_ts_ = 0;
49  unsigned nice_;
50 };
51 
52 namespace asio_ext {
53 
54 // Runs `f` asynchronously in io-context fiber. `f` should not block, lock on mutexes or Await.
55 // Spinlocks are ok but might cause performance degradation.
56 template <typename Func> void Async(::boost::asio::io_context& cntx, Func&& f) {
57  ::boost::asio::post(cntx, std::forward<Func>(f));
58 }
59 
60 // Similarly to Async(), runs 'f' in io_context thread, but waits for it to finish by blocking
61 // the calling fiber. `f` should not block because it runs directly from IO loop.
62 template <typename Func>
63 auto Await(::boost::asio::io_context& cntx, Func&& f) -> decltype(f()) {
64  // See Done class for more extensive documentation.
65  fibers_ext::Done done;
66  using ResultType = decltype(f());
67  detail::ResultMover<ResultType> mover;
68 
69  // Store done-ptr by value to increase the refcount while lambda is running.
70  Async(cntx, [&, f = std::forward<Func>(f), done]() mutable {
71  mover.Apply(f);
72  done.Notify();
73  });
74 
75  done.Wait();
76  return std::move(mover).get();
77 }
78 
79 }; // namespace asio_ext
80 
81 class IoContext {
82  friend class IoContextPool;
83 
84  public:
85  using io_context = ::boost::asio::io_context;
86 
87  class Cancellable {
88  public:
89  virtual ~Cancellable() {}
90 
91  virtual void Run() = 0;
92  virtual void Cancel() = 0;
93  };
94 
95  IoContext() : context_ptr_(std::make_shared<io_context>()) {}
96 
97  // We use shared_ptr because of the shared ownership with the fibers scheduler.
98  typedef std::shared_ptr<io_context> ptr_t;
99 
100  void Stop();
101 
102  io_context& raw_context() { return *context_ptr_; }
103 
104  template <typename Func> void Async(Func&& f) {
105  asio_ext::Async(*context_ptr_, std::forward<Func>(f));
106  }
107 
108  template <typename Func, typename... Args> void AsyncFiber(Func&& f, Args&&... args) {
109  // Ideally we want to forward args into lambda but it's too complicated before C++20.
110  // So I just copy them into capture.
111  // We forward captured variables so we need lambda to be mutable.
112  Async([f = std::forward<Func>(f), args...]() mutable {
113  ::boost::fibers::fiber(std::forward<Func>(f), std::forward<Args>(args)...).detach();
114  });
115  }
116 
117  // Similar to asio_ext::Await(), but if we call Await from the context thread,
118  // runs `f` directly (minor optimization).
119  template <typename Func> auto Await(Func&& f) -> decltype(f()) {
120  if (InContextThread()) {
121  return f();
122  }
123  return asio_ext::Await(*context_ptr_, std::forward<Func>(f));
124  }
125 
126  // Please note that this function uses Await, therefore can not be used inside Ring0
127  // (i.e. Async callbacks).
128  template <typename... Args> boost::fibers::fiber LaunchFiber(Args&&... args) {
129  ::boost::fibers::fiber fb;
130  // It's safe to use & capture since we await before returning.
131  Await([&] { fb = boost::fibers::fiber(std::forward<Args>(args)...); });
132  return fb;
133  }
134 
135  // Runs possibly awating function 'f' safely in ContextThread and waits for it to finish,
136  // If we are in the context thread already, runs 'f' directly, otherwise
137  // runs it wrapped in a fiber. Should be used instead of 'Await' when 'f' itself
138  // awaits on something.
139  // To summarize: 'f' should not block its thread, but allowed to block its fiber.
140  template <typename Func> auto AwaitSafe(Func&& f) -> decltype(f()) {
141  if (InContextThread()) {
142  return f();
143  }
144 
145  using ResultType = decltype(f());
146  detail::ResultMover<ResultType> mover;
147  auto fb = LaunchFiber([&] { mover.Apply(std::forward<Func>(f)); });
148  fb.join();
149 
150  return std::move(mover).get();
151  }
152 
153  auto get_executor() { return context_ptr_->get_executor(); }
154 
155  bool InContextThread() const { return std::this_thread::get_id() == thread_id_; }
156 
157  // Attaches user processes that should live along IoContext. IoContext will shut them down via
158  // Cancel() call right before closing its IO loop.
159  // Takes ownership over Cancellable runner. Runs it in a dedicated fiber in IoContext thread.
160  // During the shutdown process signals the object to cancel by running Cancellable::Cancel()
161  // method.
162  void AttachCancellable(Cancellable* obj) {
163  auto fb = LaunchFiber([obj] { obj->Run(); });
164  cancellable_arr_.emplace_back(
165  CancellablePair{std::unique_ptr<Cancellable>(obj), std::move(fb)});
166  }
167 
168  private:
169  void StartLoop(fibers_ext::BlockingCounter* bc);
170 
171  using CancellablePair = std::pair<std::unique_ptr<Cancellable>, ::boost::fibers::fiber>;
172 
173  ptr_t context_ptr_;
174  std::thread::id thread_id_;
175  std::vector<CancellablePair> cancellable_arr_;
176 };
177 
178 } // namespace util
A pool of IoContext objects, representing and managing CPU resources of the system.
uint64_t resume_ts() const
Definition: io_context.h:42