simple_channel.h
1 // Copyright 2019, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 #pragma once
5 
6 #include "base/ProducerConsumerQueue.h"
7 
8 #include <boost/fiber/context.hpp>
9 
10 #include "util/fibers/event_count.h"
11 #include "util/fibers/fibers_ext.h"
12 
13 namespace util {
14 namespace fibers_ext {
15 
25 template <typename T> class SimpleChannel {
26  typedef ::boost::fibers::context::wait_queue_t wait_queue_t;
27  using spinlock_lock_t = ::boost::fibers::detail::spinlock_lock;
28 
29  public:
30  SimpleChannel(size_t n) : q_(n) {}
31 
32  template <typename... Args> void Push(Args&&... recordArgs) noexcept;
33 
34  // Blocking call. Returns false if channel is closed, true otherwise with the popped value.
35  bool Pop(T& dest);
36 
45  void StartClosing();
46 
48  template <typename... Args> bool TryPush(Args&&... args) noexcept {
49  if (q_.write(std::forward<Args>(args)...)) {
50  if (++throttled_pushes_ > q_.capacity() / 3) {
51  pop_ec_.notify();
52  throttled_pushes_ = 0;
53  }
54  return true;
55  }
56  return false;
57  }
58 
60  bool TryPop(T& val) {
61  if (q_.read(val)) {
62  return true;
63  }
64  push_ec_.notify();
65  return false;
66  }
67 
68  bool IsClosing() const { return is_closing_.load(std::memory_order_relaxed); }
69 
70  private:
71  unsigned throttled_pushes_ = 0;
72 
73  folly::ProducerConsumerQueue<T> q_;
74  std::atomic_bool is_closing_{false};
75 
76  // Event counts provide almost negligible contention during fast-path (a single atomic add).
77  EventCount push_ec_, pop_ec_;
78 };
79 
80 template <typename T>
81 template <typename... Args>
82 void SimpleChannel<T>::Push(Args&&... args) noexcept {
83  if (TryPush(std::forward<Args>(args)...)) // fast path.
84  return;
85 
86  while (true) {
87  EventCount::Key key = push_ec_.prepareWait();
88  if (TryPush(std::forward<Args>(args)...)) {
89  break;
90  }
91  push_ec_.wait(key.epoch());
92  }
93 }
94 
95 template <typename T> bool SimpleChannel<T>::Pop(T& dest) {
96  if (TryPop(dest)) // fast path
97  return true;
98 
99  while (true) {
100  EventCount::Key key = pop_ec_.prepareWait();
101  if (TryPop(dest)) {
102  return true;
103  }
104 
105  if (is_closing_.load(std::memory_order_acquire)) {
106  return false;
107  }
108 
109  pop_ec_.wait(key.epoch());
110  }
111 }
112 
113 template <typename T> void SimpleChannel<T>::StartClosing() {
114  // Full barrier, StartClosing performance does not matter.
115  is_closing_.store(true, std::memory_order_seq_cst);
116  pop_ec_.notifyAll();
117 }
118 
119 } // namespace fibers_ext
120 } // namespace util
Single producer - single consumer thread-safe, fiber-friendly channel.
bool TryPop(T &val)
Non blocking pop.
bool TryPush(Args &&... args) noexcept
Non blocking push.