12 #include "util/uring/proactor_pool.h" 24 uint32_t MoveTsIfNeeded(
size_t size, int32_t* dest)
const;
26 mutable uint32_t last_ts_ = 0;
32 void CheckInit()
const;
33 unsigned ProactorThreadIndex()
const;
47 static_assert(NUM > 1,
"Invalid window size");
50 mutable std::array<T, NUM> count_;
61 void IncBy(int32_t delta) {
62 int32_t bin = MoveTsIfNeeded(NUM, count_.data());
70 MoveTsIfNeeded(NUM, count_.data());
71 return std::accumulate(count_.begin(), count_.end(), 0);
84 enum {WIN_SIZE = NUM};
90 sc_thread_map_.reset(
new Counter[pp_->size()]);
94 sc_thread_map_[ProactorThreadIndex()].Inc();
97 uint32_t Sum()
const {
100 std::atomic_uint32_t res{0};
102 res.fetch_add(sc_thread_map_[i].Sum(), std::memory_order_relaxed);
105 return res.load(std::memory_order_release);
108 uint32_t SumTail()
const {
111 std::atomic_uint32_t res{0};
113 res.fetch_add(sc_thread_map_[i].SumTail(), std::memory_order_relaxed);
116 return res.load(std::memory_order_release);
120 std::unique_ptr<Counter[]> sc_thread_map_;
128 int32_t start = MoveTsIfNeeded(NUM, count_.data()) + 1;
131 for (
unsigned i = 0; i < NUM - 1; ++i) {
132 sum += count_[(start + i) % NUM];
void AwaitOnAll(Func &&func)
Runs the funcion in all IO threads asynchronously. Blocks until all the asynchronous calls return.
Sliding window data structure that can aggregate moving statistics. It's implmented using ring-buffer...