6 #include <boost/fiber/channel_op_status.hpp>     7 #include <boost/fiber/condition_variable.hpp>     8 #include <experimental/optional>    11 #include "util/fibers/event_count.h"    15 ostream& operator<<(ostream& o, const ::boost::fibers::channel_op_status op);
    21 enum DoneWaitDirective {
    26 namespace fibers_ext {
    28 inline uint16_t short_id(::boost::fibers::context* ctx) {
    29   return reinterpret_cast<uintptr_t>(ctx);
    32 inline uint16_t short_id() {
    33   return short_id(::boost::fibers::context::active());
    48     Impl() : ready_(
false) {}
    49     Impl(
const Impl&) = 
delete;
    50     void operator=(
const Impl&) = 
delete;
    52     friend void intrusive_ptr_add_ref(Impl* done) noexcept {
    53       done->use_count_.fetch_add(1, std::memory_order_relaxed);
    56     friend void intrusive_ptr_release(Impl* impl) noexcept {
    57       if (1 == impl->use_count_.fetch_sub(1, std::memory_order_release)) {
    67         std::atomic_thread_fence(std::memory_order_acquire);
    72     bool Wait(DoneWaitDirective reset) {
    73       bool res = ec_.await([
this] { 
return ready_.load(std::memory_order_acquire); });
    74       if (reset == AND_RESET)
    75         ready_.store(
false, std::memory_order_release);
    81       ready_.store(
true, std::memory_order_release);
    85     void Reset() { ready_ = 
false; }
    87     bool IsReady()
 const { 
return ready_.load(std::memory_order_acquire); }
    91     std::atomic<std::uint32_t> use_count_{0};
    92     std::atomic_bool ready_;
    94   using ptr_t = ::boost::intrusive_ptr<Impl>;
    97   Done() : impl_(
new Impl) {}
   100   void Notify() { impl_->Notify(); }
   101   bool Wait(DoneWaitDirective reset = AND_NOTHING) { 
return impl_->Wait(reset); }
   103   void Reset() { impl_->Reset(); }
   112     Impl(
unsigned count) : count_(count) {}
   113     Impl(
const Impl&) = 
delete;
   114     void operator=(
const Impl&) = 
delete;
   116     friend void intrusive_ptr_add_ref(Impl* done) noexcept {
   117       done->use_count_.fetch_add(1, std::memory_order_relaxed);
   120     friend void intrusive_ptr_release(Impl* impl) noexcept {
   121       if (1 == impl->use_count_.fetch_sub(1, std::memory_order_release)) {
   122         std::atomic_thread_fence(std::memory_order_acquire);
   129       ec_.await([
this] { 
return 0 == count_.load(std::memory_order_acquire); });
   133       if (1 == count_.fetch_sub(1, std::memory_order_acq_rel))
   140     std::atomic<std::uint32_t> use_count_{0};
   141     std::atomic_long count_;
   143   using ptr_t = ::boost::intrusive_ptr<Impl>;
   148   void Dec() { impl_->Dec(); }
   150   void Wait() { impl_->Wait(); }
   152   void Add(
unsigned delta) { impl_->count_.fetch_add(delta, std::memory_order_acq_rel); }
   162   void Wait(uint32_t nr = 1) {
   163     std::unique_lock<::boost::fibers::mutex> lock(mutex_);
   167   void Signal(uint32_t nr = 1) {
   168     std::unique_lock<::boost::fibers::mutex> lock(mutex_);
   175   template <
typename Lock> 
void Wait(Lock& l, uint32_t nr = 1) {
   176     cond_.wait(l, [&] { 
return count_ >= nr; });
   181   ::boost::fibers::condition_variable_any cond_;
   182   ::boost::fibers::mutex mutex_;
   192 template <
typename Pred> 
void Await(::boost::fibers::condition_variable_any& cv, Pred&& pred) {
   194   cv.wait(lock, std::forward<Pred>(pred));
   202 template <
typename T> 
class Cell {
   203   std::experimental::optional<T> val_;
   204   ::boost::fibers::condition_variable_any cv_;
   207   bool IsEmpty()
 const { 
return !bool(val_); }
   210   void Emplace(T&& val) {
   211     fibers_ext::Await(cv_, [
this] { 
return IsEmpty(); });
   212     val_.emplace(std::forward<T>(val));
   216   void WaitTillFull() {
   217     fibers_ext::Await(cv_, [
this] { 
return !IsEmpty(); });
   225     val_ = std::experimental::nullopt;
   237   template <
typename Func> 
void Apply(Func&& f) { r_ = f(); }
   244   R&& get() && { 
return std::forward<R>(r_); }
   249   template <
typename Func> 
void Apply(Func&& f) { f(); }