6 #include <boost/fiber/channel_op_status.hpp> 7 #include <boost/fiber/condition_variable.hpp> 8 #include <experimental/optional> 11 #include "util/fibers/event_count.h" 15 ostream& operator<<(ostream& o, const ::boost::fibers::channel_op_status op);
21 enum DoneWaitDirective {
26 namespace fibers_ext {
28 inline uint16_t short_id(::boost::fibers::context* ctx) {
29 return reinterpret_cast<uintptr_t>(ctx);
32 inline uint16_t short_id() {
33 return short_id(::boost::fibers::context::active());
48 Impl() : ready_(
false) {}
49 Impl(
const Impl&) =
delete;
50 void operator=(
const Impl&) =
delete;
52 friend void intrusive_ptr_add_ref(Impl* done) noexcept {
53 done->use_count_.fetch_add(1, std::memory_order_relaxed);
56 friend void intrusive_ptr_release(Impl* impl) noexcept {
57 if (1 == impl->use_count_.fetch_sub(1, std::memory_order_release)) {
67 std::atomic_thread_fence(std::memory_order_acquire);
72 bool Wait(DoneWaitDirective reset) {
73 bool res = ec_.await([
this] {
return ready_.load(std::memory_order_acquire); });
74 if (reset == AND_RESET)
75 ready_.store(
false, std::memory_order_release);
81 ready_.store(
true, std::memory_order_release);
85 void Reset() { ready_ =
false; }
87 bool IsReady()
const {
return ready_.load(std::memory_order_acquire); }
91 std::atomic<std::uint32_t> use_count_{0};
92 std::atomic_bool ready_;
94 using ptr_t = ::boost::intrusive_ptr<Impl>;
97 Done() : impl_(
new Impl) {}
100 void Notify() { impl_->Notify(); }
101 bool Wait(DoneWaitDirective reset = AND_NOTHING) {
return impl_->Wait(reset); }
103 void Reset() { impl_->Reset(); }
112 Impl(
unsigned count) : count_(count) {}
113 Impl(
const Impl&) =
delete;
114 void operator=(
const Impl&) =
delete;
116 friend void intrusive_ptr_add_ref(Impl* done) noexcept {
117 done->use_count_.fetch_add(1, std::memory_order_relaxed);
120 friend void intrusive_ptr_release(Impl* impl) noexcept {
121 if (1 == impl->use_count_.fetch_sub(1, std::memory_order_release)) {
122 std::atomic_thread_fence(std::memory_order_acquire);
129 ec_.await([
this] {
return 0 == count_.load(std::memory_order_acquire); });
133 if (1 == count_.fetch_sub(1, std::memory_order_acq_rel))
140 std::atomic<std::uint32_t> use_count_{0};
141 std::atomic_long count_;
143 using ptr_t = ::boost::intrusive_ptr<Impl>;
148 void Dec() { impl_->Dec(); }
150 void Wait() { impl_->Wait(); }
152 void Add(
unsigned delta) { impl_->count_.fetch_add(delta, std::memory_order_acq_rel); }
162 void Wait(uint32_t nr = 1) {
163 std::unique_lock<::boost::fibers::mutex> lock(mutex_);
167 void Signal(uint32_t nr = 1) {
168 std::unique_lock<::boost::fibers::mutex> lock(mutex_);
175 template <
typename Lock>
void Wait(Lock& l, uint32_t nr = 1) {
176 cond_.wait(l, [&] {
return count_ >= nr; });
181 ::boost::fibers::condition_variable_any cond_;
182 ::boost::fibers::mutex mutex_;
192 template <
typename Pred>
void Await(::boost::fibers::condition_variable_any& cv, Pred&& pred) {
194 cv.wait(lock, std::forward<Pred>(pred));
202 template <
typename T>
class Cell {
203 std::experimental::optional<T> val_;
204 ::boost::fibers::condition_variable_any cv_;
207 bool IsEmpty()
const {
return !bool(val_); }
210 void Emplace(T&& val) {
211 fibers_ext::Await(cv_, [
this] {
return IsEmpty(); });
212 val_.emplace(std::forward<T>(val));
216 void WaitTillFull() {
217 fibers_ext::Await(cv_, [
this] {
return !IsEmpty(); });
225 val_ = std::experimental::nullopt;
237 template <
typename Func>
void Apply(Func&& f) { r_ = f(); }
244 R&& get() && {
return std::forward<R>(r_); }
249 template <
typename Func>
void Apply(Func&& f) { f(); }