sliding_counter.h
1 // Copyright 2020, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 
5 #pragma once
6 
7 #include <array>
8 #include <cstdint>
9 #include <memory>
10 #include <numeric>
11 
12 #include "util/uring/proactor_pool.h"
13 
14 namespace util {
15 namespace uring {
16 
17 namespace detail {
18 
20  protected:
21  // Returns the bin corresponding to the current timestamp. Has second precision.
22  // updates last_ts_ according to the current timestamp and returns the latest bin.
23  // has const semantics even though it updates mutable last_ts_.
24  uint32_t MoveTsIfNeeded(size_t size, int32_t* dest) const;
25 
26  mutable uint32_t last_ts_ = 0;
27 };
28 
30  protected:
31  void InitInternal(ProactorPool* pp);
32  void CheckInit() const;
33  unsigned ProactorThreadIndex() const;
34 
35  ProactorPool* pp_ = nullptr;
36 };
37 
38 } // namespace detail
39 
46 template <unsigned NUM> class SlidingCounterTL : protected detail::SlidingCounterTLBase {
47  static_assert(NUM > 1, "Invalid window size");
48 
49  using T = int32_t;
50  mutable std::array<T, NUM> count_;
51 
52  public:
54  Reset();
55  }
56 
57  void Inc() {
58  IncBy(1);
59  }
60 
61  void IncBy(int32_t delta) {
62  int32_t bin = MoveTsIfNeeded(NUM, count_.data());
63  count_[bin] += delta;
64  }
65 
66  // Sums over bins not including the last bin that is currently being filled.
67  T SumTail() const;
68 
69  T Sum() const {
70  MoveTsIfNeeded(NUM, count_.data());
71  return std::accumulate(count_.begin(), count_.end(), 0);
72  }
73 
74  void Reset() {
75  count_.fill(0);
76  }
77 };
78 
79 // Requires proactor_pool initialize all the proactors.
80 template <unsigned NUM> class SlidingCounter : protected detail::SlidingCounterBase {
82 
83  public:
84  enum {WIN_SIZE = NUM};
85 
86  SlidingCounter() = default;
87 
88  void Init(ProactorPool* pp) {
89  InitInternal(pp);
90  sc_thread_map_.reset(new Counter[pp_->size()]);
91  }
92 
93  void Inc() {
94  sc_thread_map_[ProactorThreadIndex()].Inc();
95  }
96 
97  uint32_t Sum() const {
98  CheckInit();
99 
100  std::atomic_uint32_t res{0};
101  pp_->AwaitOnAll([&](unsigned i, Proactor*) {
102  res.fetch_add(sc_thread_map_[i].Sum(), std::memory_order_relaxed);
103  });
104 
105  return res.load(std::memory_order_release);
106  }
107 
108  uint32_t SumTail() const {
109  CheckInit();
110 
111  std::atomic_uint32_t res{0};
112  pp_->AwaitOnAll([&](unsigned i, Proactor*) {
113  res.fetch_add(sc_thread_map_[i].SumTail(), std::memory_order_relaxed);
114  });
115 
116  return res.load(std::memory_order_release);
117  }
118 
119  private:
120  std::unique_ptr<Counter[]> sc_thread_map_;
121 };
122 
123 /*********************************************
124  Implementation section.
125 **********************************************/
126 
127 template <unsigned NUM> auto SlidingCounterTL<NUM>::SumTail() const -> T {
128  int32_t start = MoveTsIfNeeded(NUM, count_.data()) + 1; // the tail is one after head.
129 
130  T sum = 0;
131  for (unsigned i = 0; i < NUM - 1; ++i) {
132  sum += count_[(start + i) % NUM];
133  }
134  return sum;
135 }
136 
137 } // namespace uring
138 } // namespace util
void AwaitOnAll(Func &&func)
Runs the funcion in all IO threads asynchronously. Blocks until all the asynchronous calls return.
Sliding window data structure that can aggregate moving statistics. It's implmented using ring-buffer...