9 #include <boost/fiber/context.hpp> 11 #include "base/macros.h" 14 namespace fibers_ext {
23 using spinlock_lock_t = ::boost::fibers::detail::spinlock_lock;
24 using wait_queue_t = ::boost::fibers::context::wait_queue_t;
28 ::boost::fibers::detail::spinlock wait_queue_splk_{};
29 wait_queue_t wait_queue_{};
39 explicit Key(
EventCount* me, uint32_t e) noexcept : me_(me), epoch_(e) {}
44 Key(
Key&&) noexcept =
default;
50 me_->val_.fetch_sub(kAddWaiter, std::memory_order_seq_cst);
53 uint32_t epoch()
const {
return epoch_; }
57 bool notify() noexcept;
59 bool notifyAll() noexcept;
61 Key prepareWait() noexcept {
62 uint64_t prev = val_.fetch_add(kAddWaiter, std::memory_order_acq_rel);
63 return Key(
this, prev >> kEpochShift);
66 void wait(uint32_t epoch) noexcept;
72 template <
typename Condition>
bool await(Condition condition);
77 static bool should_switch(::boost::fibers::context* ctx, std::intptr_t expected) {
78 return ctx->twstatus.compare_exchange_strong(expected, static_cast<std::intptr_t>(-1),
79 std::memory_order_acq_rel) ||
83 EventCount(
const EventCount&) =
delete;
84 EventCount(EventCount&&) =
delete;
85 EventCount& operator=(
const EventCount&) =
delete;
86 EventCount& operator=(EventCount&&) =
delete;
89 static_assert(
sizeof(uint32_t) == 4,
"bad platform");
90 static_assert(
sizeof(uint64_t) == 8,
"bad platform");
94 std::atomic<uint64_t> val_;
96 static constexpr uint64_t kAddWaiter = uint64_t(1);
98 static constexpr
size_t kEpochShift = 32;
99 static constexpr uint64_t kAddEpoch = uint64_t(1) << kEpochShift;
100 static constexpr uint64_t kWaiterMask = kAddEpoch - 1;
103 inline bool EventCount::notify() noexcept {
104 uint64_t prev = val_.fetch_add(kAddEpoch, std::memory_order_release);
106 if (UNLIKELY(prev & kWaiterMask)) {
107 auto* active_ctx = ::boost::fibers::context::active();
115 spinlock_lock_t lk{wait_queue_splk_};
116 while (!wait_queue_.empty()) {
117 auto* ctx = &wait_queue_.front();
118 wait_queue_.pop_front();
120 if (should_switch(ctx, reinterpret_cast<std::intptr_t>(
this))) {
123 active_ctx->schedule(ctx);
133 inline bool EventCount::notifyAll() noexcept {
134 uint64_t prev = val_.fetch_add(kAddEpoch, std::memory_order_release);
136 if (UNLIKELY(prev & kWaiterMask)) {
137 auto* active_ctx = ::boost::fibers::context::active();
139 spinlock_lock_t lk{wait_queue_splk_};
141 tmp.swap(wait_queue_);
144 while (!tmp.empty()) {
145 ::boost::fibers::context* ctx = &tmp.front();
148 if (should_switch(ctx, reinterpret_cast<std::intptr_t>(
this))) {
150 active_ctx->schedule(ctx);
159 inline void EventCount::wait(uint32_t epoch) noexcept {
160 if ((val_.load(std::memory_order_acquire) >> kEpochShift) != epoch)
163 auto* active_ctx = ::boost::fibers::context::active();
165 spinlock_lock_t lk{wait_queue_splk_};
166 if ((val_.load(std::memory_order_acquire) >> kEpochShift) == epoch) {
169 active_ctx->wait_link(wait_queue_);
170 active_ctx->twstatus.store(static_cast<std::intptr_t>(0), std::memory_order_release);
173 active_ctx->suspend(lk);
184 bool preempt =
false;
186 Key key = prepareWait();
bool await(Condition condition)