event_count.h
1 // Copyright 2019, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 // Based on the design of folly event_count which in turn based on
5 // Dmitry Vyukov's proposal at
6 // https://software.intel.com/en-us/forums/intel-threading-building-blocks/topic/299245
7 #pragma once
8 
9 #include <boost/fiber/context.hpp>
10 
11 #include "base/macros.h"
12 
13 namespace util {
14 namespace fibers_ext {
15 
16 // This class is all about reducing the contention on the producer side (notifications).
17 // We want notifications to be as light as possible, while waits are less important
18 // since they on the path of being suspended anyway. However, we also want to reduce number of
19 // spurious waits on the consumer side.
20 // This class has another wonderful property: notification thread does not need to lock mutex,
21 // which means it can be used from the io_context (ring0) fiber.
22 class EventCount {
23  using spinlock_lock_t = ::boost::fibers::detail::spinlock_lock;
24  using wait_queue_t = ::boost::fibers::context::wait_queue_t;
25 
27  // and fibers lib supports only this type for that.
28  ::boost::fibers::detail::spinlock wait_queue_splk_{};
29  wait_queue_t wait_queue_{};
30 
31  public:
32  EventCount() noexcept : val_(0) {}
33 
34  class Key {
35  friend class EventCount;
36  EventCount* me_;
37  uint32_t epoch_;
38 
39  explicit Key(EventCount* me, uint32_t e) noexcept : me_(me), epoch_(e) {}
40 
41  Key(const Key&) = delete;
42 
43  public:
44  Key(Key&&) noexcept = default;
45 
46  ~Key() {
47  // memory_order_relaxed would suffice for correctness, but the faster
48  // #waiters gets to 0, the less likely it is that we'll do spurious wakeups
49  // (and thus system calls).
50  me_->val_.fetch_sub(kAddWaiter, std::memory_order_seq_cst);
51  }
52 
53  uint32_t epoch() const { return epoch_; }
54  };
55 
56  // Return true if a notification was made, false if no notification was issued.
57  bool notify() noexcept;
58 
59  bool notifyAll() noexcept;
60 
61  Key prepareWait() noexcept {
62  uint64_t prev = val_.fetch_add(kAddWaiter, std::memory_order_acq_rel);
63  return Key(this, prev >> kEpochShift);
64  }
65 
66  void wait(uint32_t epoch) noexcept;
67 
72  template <typename Condition> bool await(Condition condition);
73 
74  private:
75  friend class Key;
76 
77  static bool should_switch(::boost::fibers::context* ctx, std::intptr_t expected) {
78  return ctx->twstatus.compare_exchange_strong(expected, static_cast<std::intptr_t>(-1),
79  std::memory_order_acq_rel) ||
80  expected == 0;
81  }
82 
83  EventCount(const EventCount&) = delete;
84  EventCount(EventCount&&) = delete;
85  EventCount& operator=(const EventCount&) = delete;
86  EventCount& operator=(EventCount&&) = delete;
87 
88  // This requires 64-bit
89  static_assert(sizeof(uint32_t) == 4, "bad platform");
90  static_assert(sizeof(uint64_t) == 8, "bad platform");
91 
92  // val_ stores the epoch in the most significant 32 bits and the
93  // waiter count in the least significant 32 bits.
94  std::atomic<uint64_t> val_;
95 
96  static constexpr uint64_t kAddWaiter = uint64_t(1);
97 
98  static constexpr size_t kEpochShift = 32;
99  static constexpr uint64_t kAddEpoch = uint64_t(1) << kEpochShift;
100  static constexpr uint64_t kWaiterMask = kAddEpoch - 1;
101 };
102 
103 inline bool EventCount::notify() noexcept {
104  uint64_t prev = val_.fetch_add(kAddEpoch, std::memory_order_release);
105 
106  if (UNLIKELY(prev & kWaiterMask)) {
107  auto* active_ctx = ::boost::fibers::context::active();
108 
109  /*
110  lk makes sure that when a waiting thread is entered the critical section in
111  EventCount::wait, it atomically checks val_ when entering the WAIT state.
112  We need it in order to make sure that cnd_.notify() is not called before the waiting
113  thread enters WAIT state and thus the notification is missed.
114  */
115  spinlock_lock_t lk{wait_queue_splk_};
116  while (!wait_queue_.empty()) {
117  auto* ctx = &wait_queue_.front();
118  wait_queue_.pop_front();
119 
120  if (should_switch(ctx, reinterpret_cast<std::intptr_t>(this))) {
121  // notify context
122  lk.unlock();
123  active_ctx->schedule(ctx);
124  break;
125  }
126  }
127 
128  return true;
129  }
130  return false;
131 }
132 
133 inline bool EventCount::notifyAll() noexcept {
134  uint64_t prev = val_.fetch_add(kAddEpoch, std::memory_order_release);
135 
136  if (UNLIKELY(prev & kWaiterMask)) {
137  auto* active_ctx = ::boost::fibers::context::active();
138 
139  spinlock_lock_t lk{wait_queue_splk_};
140  wait_queue_t tmp;
141  tmp.swap(wait_queue_);
142  lk.unlock();
143 
144  while (!tmp.empty()) {
145  ::boost::fibers::context* ctx = &tmp.front();
146  tmp.pop_front();
147 
148  if (should_switch(ctx, reinterpret_cast<std::intptr_t>(this))) {
149  // notify context
150  active_ctx->schedule(ctx);
151  }
152  }
153  }
154 
155  return false;
156 };
157 
158 // Atomically checks for epoch and waits on cond_var.
159 inline void EventCount::wait(uint32_t epoch) noexcept {
160  if ((val_.load(std::memory_order_acquire) >> kEpochShift) != epoch)
161  return;
162 
163  auto* active_ctx = ::boost::fibers::context::active();
164 
165  spinlock_lock_t lk{wait_queue_splk_};
166  if ((val_.load(std::memory_order_acquire) >> kEpochShift) == epoch) {
167  // atomically call lt.unlock() and block on *this
168  // store this fiber in waiting-queue
169  active_ctx->wait_link(wait_queue_);
170  active_ctx->twstatus.store(static_cast<std::intptr_t>(0), std::memory_order_release);
171 
172  // suspend this fiber
173  active_ctx->suspend(lk);
174  }
175 }
176 
177 // Returns true if had to preempt, false if no preemption happenned.
178 template <typename Condition> bool EventCount::await(Condition condition) {
179  if (condition())
180  return false; // fast path
181 
182  // condition() is the only thing that may throw, everything else is
183  // noexcept, Key destructor makes sure to cancelWait state when exiting the function.
184  bool preempt = false;
185  while (true) {
186  Key key = prepareWait();
187  if (condition()) {
188  break;
189  }
190  preempt = true;
191  wait(key.epoch());
192  }
193  return preempt;
194 }
195 
196 } // namespace fibers_ext
197 } // namespace util
bool await(Condition condition)
Definition: event_count.h:178