varz.cc
1 // Copyright 2020, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 
5 #include "util/uring/varz.h"
6 
7 using namespace boost;
8 
9 namespace util {
10 namespace uring {
11 
12 VarzValue VarzQps::GetData() const {
13 
14  uint32_t qps = val_.SumTail() / (Counter::WIN_SIZE - 1); // Average over kWinSize values.
15  return VarzValue::FromInt(qps);
16 }
17 
18 VarzMapAverage::~VarzMapAverage() {}
19 
20 void VarzMapAverage::Init(ProactorPool* pp) {
21  CHECK(pp_ == nullptr);
22  pp_ = CHECK_NOTNULL(pp);
23  avg_map_.reset(new Map[pp->size()]);
24 }
25 
26 unsigned VarzMapAverage::ProactorThreadIndex() const {
27  unsigned tnum = CHECK_NOTNULL(pp_)->size();
28 
29  int32_t indx = Proactor::GetIndex();
30  CHECK_GE(indx, 0) << "Must be called from proactor thread!";
31  CHECK_LT(indx, tnum) << "Invalid thread index " << indx;
32 
33  return unsigned(indx);
34 }
35 
36 auto VarzMapAverage::FindSlow(absl::string_view key) -> Map::iterator {
37  auto str = pp_->GetString(key);
38  auto& map = avg_map_[Proactor::GetIndex()];
39  auto res = map.emplace(str, SumCnt{});
40 
41  CHECK(res.second);
42  return res.first;
43 }
44 
45 VarzValue VarzMapAverage::GetData() const {
46  CHECK(pp_);
47 
48  AnyValue::Map result;
49  fibers::mutex mu;
50 
51  auto cb = [&](unsigned index, Proactor*) {
52  auto& map = avg_map_[index];
53 
54  for (const auto& k_v : map) {
55  AnyValue::Map items;
56  int64 count = k_v.second.second.Sum();
57  int64 sum = k_v.second.first.Sum();
58  items.emplace_back("count", VarzValue::FromInt(count));
59  items.emplace_back("sum", VarzValue::FromInt(sum));
60 
61  if (count) {
62  double avg = count > 0 ? double(sum) / count : 0;
63  items.emplace_back("average", VarzValue::FromDouble(avg));
64  }
65 
66  std::unique_lock<fibers::mutex> lk(mu);
67  result.emplace_back(std::string(k_v.first), std::move(items));
68  }
69  };
70  pp_->AwaitFiberOnAll(cb);
71 
72  return result;
73 }
74 
75 }
76 } // namespace util