6 #include "base/ProducerConsumerQueue.h" 8 #include <boost/fiber/context.hpp> 10 #include "util/fibers/event_count.h" 11 #include "util/fibers/fibers_ext.h" 14 namespace fibers_ext {
26 typedef ::boost::fibers::context::wait_queue_t wait_queue_t;
27 using spinlock_lock_t = ::boost::fibers::detail::spinlock_lock;
32 template <
typename... Args>
void Push(Args&&... recordArgs) noexcept;
48 template <
typename... Args>
bool TryPush(Args&&... args) noexcept {
49 if (q_.write(std::forward<Args>(args)...)) {
50 if (++throttled_pushes_ > q_.capacity() / 3) {
52 throttled_pushes_ = 0;
68 bool IsClosing()
const {
return is_closing_.load(std::memory_order_relaxed); }
71 unsigned throttled_pushes_ = 0;
73 folly::ProducerConsumerQueue<T> q_;
74 std::atomic_bool is_closing_{
false};
77 EventCount push_ec_, pop_ec_;
81 template <
typename... Args>
82 void SimpleChannel<T>::Push(Args&&... args) noexcept {
83 if (TryPush(std::forward<Args>(args)...))
87 EventCount::Key key = push_ec_.prepareWait();
88 if (TryPush(std::forward<Args>(args)...)) {
91 push_ec_.wait(key.epoch());
95 template <
typename T>
bool SimpleChannel<T>::Pop(T& dest) {
100 EventCount::Key key = pop_ec_.prepareWait();
105 if (is_closing_.load(std::memory_order_acquire)) {
109 pop_ec_.wait(key.epoch());
115 is_closing_.store(
true, std::memory_order_seq_cst);
Single producer - single consumer thread-safe, fiber-friendly channel.
bool TryPop(T &val)
Non blocking pop.
bool TryPush(Args &&... args) noexcept
Non blocking push.