do_context.h
1 // Copyright 2019, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 #pragma once
5 
6 #include <boost/fiber/fss.hpp>
7 #include <string>
8 
9 #include "absl/container/flat_hash_map.h"
10 
11 #include "mr/impl/freq_map_wrapper.h"
12 #include "mr/mr_types.h"
13 #include "mr/output.h"
14 #include "strings/unique_strings.h"
15 
16 namespace mr3 {
17 
18 template <typename T> class DoContext;
19 class OperatorExecutor;
20 
21 namespace detail {
22 template <typename Handler, typename ToType> class HandlerWrapper;
23 
24 void VerifyUnspecifiedSharding(const pb::Output& outp);
25 
26 } // namespace detail
27 
28 // User facing interfaces. void tag for dispatching per class of types
29 // (i.e. derived from protobuf::Message etc).
30 // TODO: this design is not composable.
31 // i.e. I would like to be able to define serializers for basic types and easily compose more
32 // complicated ones.
33 template <typename Record, typename = void> struct RecordTraits {
34  static_assert(sizeof(base::void_t<Record>) == 0, "Please specify RecordTraits<> for this type");
35 };
36 
37 template <> struct RecordTraits<std::string> {
38  static std::string Serialize(bool is_binary, std::string&& r) { return std::move(r); }
39 
40  static bool Parse(bool is_binary, std::string&& tmp, std::string* res) {
41  *res = std::move(tmp);
42  return true;
43  }
44 };
45 
51 class RawContext {
52  template <typename T> friend class DoContext;
53  friend class OperatorExecutor;
54  public:
56  using InputMetaData = absl::variant<absl::monostate, int64_t, std::string>;
57  using FreqMapRegistry =
58  absl::flat_hash_map<std::string, detail::FreqMapWrapper>;
59 
60  RawContext();
61 
62  virtual ~RawContext();
63 
66  virtual void Flush() {}
67  virtual void CloseShard(const ShardId& sid) = 0;
68 
70  void IncBy(StringPiece name, long delta) { metric_map_[name] += delta; }
71  void Inc(StringPiece name) { IncBy(name, 1); }
72  StringPieceDenseMap<long>& metric_map() { return metric_map_; }
73 
76  void UpdateMetricMap(MetricMap* metric_map) {
77  for (const auto& k_v : metric_map_)
78  (*metric_map)[std::string(k_v.first)] += k_v.second;
79  }
80 
82  void TEST_Write(const ShardId& shard_id, std::string&& record) {
83  Write(shard_id, std::move(record));
84  }
85 
86  void EmitParseError() { ++metric_map_["parse-errors"]; }
87 
88  template <class T>
89  FrequencyMap<T>& GetFreqMapStatistic(const std::string& map_id) {
90  auto res = freq_maps_.emplace(map_id, detail::FreqMapWrapper());
91  if (res.second) {
92  res.first->second = detail::FreqMapWrapper(FrequencyMap<T>());
93  }
94  return res.first->second.Cast<T>();
95  }
96 
97  // Finds the map produced by operators in the previous steps
98  template <class T>
99  const FrequencyMap<T>* FindMaterializedFreqMapStatistic(
100  const std::string& map_id) const {
101  const detail::FreqMapWrapper *ptr = FindMaterializedFreqMapStatisticImpl(map_id);
102  return &ptr->Cast<T>();
103  }
104 
105  // Sometimes we run 2 shards per thread, in which case it is important to have these per-fiber
106  struct PerFiber {
107  std::string file_name;
108  ShardId current_shard;
109  InputMetaData metadata;
110  size_t input_pos = 0;
111  bool is_binary = false;
112  };
113 
114  void InitPerFiber();
115 
116  const PerFiber *per_fiber() const {
117  return per_fiber_.get();
118  }
119 
120  private:
121  void Write(const ShardId& shard_id, std::string&& record) {
122  ++metric_map_["fn-writes"];
123  WriteInternal(shard_id, std::move(record));
124  }
125 
126  const detail::FreqMapWrapper *FindMaterializedFreqMapStatisticImpl(const std::string&) const;
127 
128  // To allow testing we mark this function as public.
129  virtual void WriteInternal(const ShardId& shard_id, std::string&& record) = 0;
130 
131  ::boost::fibers::fiber_specific_ptr<PerFiber> per_fiber_;
132 
133  StringPieceDenseMap<long> metric_map_;
134  FreqMapRegistry freq_maps_;
135  const FreqMapRegistry* finalized_maps_ = nullptr;
136 };
137 
139  public:
140  explicit PipelineContext(RawContext* raw) : raw_(raw) {}
141 
142  template <class T>
143  const FrequencyMap<T>* FindMaterializedFreqMapStatistic(const std::string& map_id) const {
144  return raw_->FindMaterializedFreqMapStatistic<T>(map_id);
145  }
146 
147  private:
148  RawContext* raw_;
149 };
150 
151 // This class is created per MapFiber in SetupDoFn and it wraps RawContext.
152 // It's thread-local as well as caching a pointer to the fiber-local part of the RawContext.
153 template <typename T> class DoContext {
154  template <typename Handler, typename ToType> friend class detail::HandlerWrapper;
155 
156  public:
157  DoContext(const Output<T>& out, RawContext* context)
158  : out_(out), context_(context), context_fiber_local_(context->per_fiber()) {}
159 
160  template<typename U> void Write(const ShardId& shard_id, U&& u) {
161  context_->Write(shard_id, rt_.Serialize(out_.is_binary(), std::forward<U>(u)));
162  }
163 
164  void Write(T& t) {
165  ShardId shard_id = out_.Shard(t);
166  Write(shard_id, t);
167  }
168 
169  void Write(T&& t) {
170  ShardId shard_id = out_.Shard(t);
171  Write(shard_id, std::move(t));
172  }
173 
174  RawContext* raw() { return context_; }
175 
176  // These functions are quicker than accessing fiber local through raw()
177  bool is_binary() const { return context_fiber_local_->is_binary; }
178  const std::string& input_file_name() const { return context_fiber_local_->file_name; }
179  const RawContext::InputMetaData& meta_data() const { return context_fiber_local_->metadata; }
180  size_t input_pos() const { return context_fiber_local_->input_pos; }
181 
182  //
183  void SetOutputShard(ShardId sid) {
184  detail::VerifyUnspecifiedSharding(out_.msg());
185  out_.SetConstantShard(std::move(sid));
186  }
187 
188  void CloseShard(const ShardId& sid) { raw()->CloseShard(sid); }
189 
190 private:
191 
192  Output<T> out_;
193  RawContext* context_;
194  const RawContext::PerFiber* context_fiber_local_;
195  RecordTraits<T> rt_;
196 };
197 
198 } // namespace mr3
void TEST_Write(const ShardId &shard_id, std::string &&record)
Used only in tests.
Definition: do_context.h:82
absl::variant< absl::monostate, int64_t, std::string > InputMetaData
std/absl monostate is an empty class that gives variant optional semantics.
Definition: do_context.h:56
void IncBy(StringPiece name, long delta)
MR metrics - are used for monitoring, exposing statistics via http.
Definition: do_context.h:70
Base class for operator executors.
virtual void Flush()
Definition: do_context.h:66
void UpdateMetricMap(MetricMap *metric_map)
Definition: do_context.h:76