fibers_ext.h
1 // Copyright 2018, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 #pragma once
5 
6 #include <boost/fiber/channel_op_status.hpp>
7 #include <boost/fiber/condition_variable.hpp>
8 #include <experimental/optional>
9 #include <ostream>
10 
11 #include "util/fibers/event_count.h"
12 
13 namespace std {
14 
15 ostream& operator<<(ostream& o, const ::boost::fibers::channel_op_status op);
16 
17 } // namespace std
18 
19 namespace util {
20 
21 enum DoneWaitDirective {
22  AND_NOTHING = 0,
23  AND_RESET = 1
24 };
25 
26 namespace fibers_ext {
27 
28 inline uint16_t short_id(::boost::fibers::context* ctx) {
29  return reinterpret_cast<uintptr_t>(ctx);
30 }
31 
32 inline uint16_t short_id() {
33  return short_id(::boost::fibers::context::active());
34 }
35 
36 
37 // Wrap canonical pattern for condition_variable + bool flag
38 // We can not synchronize threads with a condition-like variable on a stack.
39 // The reason is that it's possible that the main (waiting) thread will pass "Wait()" call
40 // and continue by destructing "done" variable while the background thread
41 // is still accessing "done". It's possible to fix it only with usage of mutex but we want to
42 // refrain from using mutex to allow non-blocking call to Notify(). Thus Done becomes
43 // io_context friendly. Therefore we must use heap based,
44 // reference counted Done object.
45 class Done {
46  class Impl {
47  public:
48  Impl() : ready_(false) {}
49  Impl(const Impl&) = delete;
50  void operator=(const Impl&) = delete;
51 
52  friend void intrusive_ptr_add_ref(Impl* done) noexcept {
53  done->use_count_.fetch_add(1, std::memory_order_relaxed);
54  }
55 
56  friend void intrusive_ptr_release(Impl* impl) noexcept {
57  if (1 == impl->use_count_.fetch_sub(1, std::memory_order_release)) {
58  // We want to synchronize on all changes to obj performed in other threads.
59  // obj is not atomic but we know that whatever was being written - has been written
60  // in other threads and no references to obj exist anymore.
61  // Therefore acquiring fence is enough to synchronize.
62  // "acquire" requires a release opearation to mark the end of the memory changes we wish
63  // to acquire, and "fetch_sub(std::memory_order_release)" provides this marker.
64  // To summarize: fetch_sub(release) and fence(acquire) needed to order and synchronize
65  // on changes on obj in most performant way.
66  // See: https://stackoverflow.com/q/27751025/
67  std::atomic_thread_fence(std::memory_order_acquire);
68  delete impl;
69  }
70  }
71 
72  bool Wait(DoneWaitDirective reset) {
73  bool res = ec_.await([this] { return ready_.load(std::memory_order_acquire); });
74  if (reset == AND_RESET)
75  ready_.store(false, std::memory_order_release);
76  return res;
77  }
78 
79  // We use EventCount to wake threads without blocking.
80  void Notify() {
81  ready_.store(true, std::memory_order_release);
82  ec_.notify();
83  }
84 
85  void Reset() { ready_ = false; }
86 
87  bool IsReady() const { return ready_.load(std::memory_order_acquire); }
88 
89  private:
90  EventCount ec_;
91  std::atomic<std::uint32_t> use_count_{0};
92  std::atomic_bool ready_;
93  };
94  using ptr_t = ::boost::intrusive_ptr<Impl>;
95 
96  public:
97  Done() : impl_(new Impl) {}
98  ~Done() {}
99 
100  void Notify() { impl_->Notify(); }
101  bool Wait(DoneWaitDirective reset = AND_NOTHING) { return impl_->Wait(reset); }
102 
103  void Reset() { impl_->Reset(); }
104 
105  private:
106  ptr_t impl_;
107 };
108 
110  class Impl {
111  public:
112  Impl(unsigned count) : count_(count) {}
113  Impl(const Impl&) = delete;
114  void operator=(const Impl&) = delete;
115 
116  friend void intrusive_ptr_add_ref(Impl* done) noexcept {
117  done->use_count_.fetch_add(1, std::memory_order_relaxed);
118  }
119 
120  friend void intrusive_ptr_release(Impl* impl) noexcept {
121  if (1 == impl->use_count_.fetch_sub(1, std::memory_order_release)) {
122  std::atomic_thread_fence(std::memory_order_acquire);
123  delete impl;
124  }
125  }
126 
127  // I suspect all memory order accesses here could be "relaxed" but I do not bother.
128  void Wait() {
129  ec_.await([this] { return 0 == count_.load(std::memory_order_acquire); });
130  }
131 
132  void Dec() {
133  if (1 == count_.fetch_sub(1, std::memory_order_acq_rel))
134  ec_.notify();
135  }
136 
137  private:
138  friend class BlockingCounter;
139  EventCount ec_;
140  std::atomic<std::uint32_t> use_count_{0};
141  std::atomic_long count_;
142  };
143  using ptr_t = ::boost::intrusive_ptr<Impl>;
144 
145  public:
146  explicit BlockingCounter(unsigned count) : impl_(new Impl(count)) {}
147 
148  void Dec() { impl_->Dec(); }
149 
150  void Wait() { impl_->Wait(); }
151 
152  void Add(unsigned delta) { impl_->count_.fetch_add(delta, std::memory_order_acq_rel); }
153 
154  private:
155  ptr_t impl_;
156 };
157 
158 class Semaphore {
159  public:
160  Semaphore(uint32_t cnt) : count_(cnt) {}
161 
162  void Wait(uint32_t nr = 1) {
163  std::unique_lock<::boost::fibers::mutex> lock(mutex_);
164  Wait(lock, nr);
165  }
166 
167  void Signal(uint32_t nr = 1) {
168  std::unique_lock<::boost::fibers::mutex> lock(mutex_);
169  count_ += nr;
170  lock.unlock();
171 
172  cond_.notify_all();
173  }
174 
175  template <typename Lock> void Wait(Lock& l, uint32_t nr = 1) {
176  cond_.wait(l, [&] { return count_ >= nr; });
177  count_ -= nr;
178  }
179 
180  private:
181  ::boost::fibers::condition_variable_any cond_;
182  ::boost::fibers::mutex mutex_;
183  uint32_t count_;
184 };
185 
186 // For synchronizing fibers in single-threaded environment.
187 struct NoOpLock {
188  void lock() {}
189  void unlock() {}
190 };
191 
192 template <typename Pred> void Await(::boost::fibers::condition_variable_any& cv, Pred&& pred) {
193  NoOpLock lock;
194  cv.wait(lock, std::forward<Pred>(pred));
195 }
196 
197 // Single threaded synchronization primitive between fibers.
198 // fibers::unbufferred_channel has problematic design? with respect to move semantics and
199 // "try_push" method because it will move the value even if it was not pushed.
200 // Therefore, for single producer, single consumer single threaded case we can use this
201 // Cell class for emulating unbufferred_channel.
202 template <typename T> class Cell {
203  std::experimental::optional<T> val_;
204  ::boost::fibers::condition_variable_any cv_;
205 
206  public:
207  bool IsEmpty() const { return !bool(val_); }
208 
209  // Might block the calling fiber.
210  void Emplace(T&& val) {
211  fibers_ext::Await(cv_, [this] { return IsEmpty(); });
212  val_.emplace(std::forward<T>(val));
213  cv_.notify_one();
214  }
215 
216  void WaitTillFull() {
217  fibers_ext::Await(cv_, [this] { return !IsEmpty(); });
218  }
219 
220  T& value() {
221  return *val_; // optional stays engaged.
222  }
223 
224  void Clear() {
225  val_ = std::experimental::nullopt;
226  cv_.notify_one();
227  }
228 };
229 
230 } // namespace fibers_ext
231 
232 namespace detail {
233 
234 template <typename R> class ResultMover {
235  R r_; // todo: to set as optional to support objects without default c'tor.
236  public:
237  template <typename Func> void Apply(Func&& f) { r_ = f(); }
238 
239  // Returning rvalue-reference means returning the same object r_ instead of creating a
240  // temporary R{r_}. Please note that when we return function-local object, we do not need to
241  // return rvalue because RVO eliminates redundant object creation.
242  // But for returning data member r_ it's more efficient.
243  // "get() &&" means you can call this function only on rvalue ResultMover&& object.
244  R&& get() && { return std::forward<R>(r_); }
245 };
246 
247 template <> class ResultMover<void> {
248  public:
249  template <typename Func> void Apply(Func&& f) { f(); }
250  void get() {}
251 };
252 
253 } // namespace detail
254 } // namespace util