6 #include <boost/fiber/fss.hpp> 9 #include "absl/container/flat_hash_map.h" 11 #include "mr/impl/freq_map_wrapper.h" 12 #include "mr/mr_types.h" 13 #include "mr/output.h" 14 #include "strings/unique_strings.h" 22 template <
typename Handler,
typename ToType>
class HandlerWrapper;
24 void VerifyUnspecifiedSharding(
const pb::Output& outp);
34 static_assert(
sizeof(base::void_t<Record>) == 0,
"Please specify RecordTraits<> for this type");
38 static std::string Serialize(
bool is_binary, std::string&& r) {
return std::move(r); }
40 static bool Parse(
bool is_binary, std::string&& tmp, std::string* res) {
41 *res = std::move(tmp);
52 template <
typename T>
friend class DoContext;
56 using InputMetaData = absl::variant<absl::monostate, int64_t, std::string>;
57 using FreqMapRegistry =
58 absl::flat_hash_map<std::string, detail::FreqMapWrapper>;
67 virtual void CloseShard(
const ShardId& sid) = 0;
70 void IncBy(StringPiece name,
long delta) { metric_map_[name] += delta; }
71 void Inc(StringPiece name) {
IncBy(name, 1); }
72 StringPieceDenseMap<long>& metric_map() {
return metric_map_; }
77 for (
const auto& k_v : metric_map_)
78 (*metric_map)[std::string(k_v.first)] += k_v.second;
83 Write(shard_id, std::move(record));
86 void EmitParseError() { ++metric_map_[
"parse-errors"]; }
89 FrequencyMap<T>& GetFreqMapStatistic(
const std::string& map_id) {
90 auto res = freq_maps_.emplace(map_id, detail::FreqMapWrapper());
92 res.first->second = detail::FreqMapWrapper(FrequencyMap<T>());
94 return res.first->second.Cast<T>();
99 const FrequencyMap<T>* FindMaterializedFreqMapStatistic(
100 const std::string& map_id)
const {
101 const detail::FreqMapWrapper *ptr = FindMaterializedFreqMapStatisticImpl(map_id);
102 return &ptr->Cast<T>();
107 std::string file_name;
110 size_t input_pos = 0;
111 bool is_binary =
false;
117 return per_fiber_.get();
121 void Write(
const ShardId& shard_id, std::string&& record) {
122 ++metric_map_[
"fn-writes"];
123 WriteInternal(shard_id, std::move(record));
126 const detail::FreqMapWrapper *FindMaterializedFreqMapStatisticImpl(
const std::string&)
const;
129 virtual void WriteInternal(
const ShardId& shard_id, std::string&& record) = 0;
131 ::boost::fibers::fiber_specific_ptr<PerFiber> per_fiber_;
133 StringPieceDenseMap<long> metric_map_;
134 FreqMapRegistry freq_maps_;
135 const FreqMapRegistry* finalized_maps_ =
nullptr;
143 const FrequencyMap<T>* FindMaterializedFreqMapStatistic(
const std::string& map_id)
const {
144 return raw_->FindMaterializedFreqMapStatistic<T>(map_id);
154 template <
typename Handler,
typename ToType>
friend class detail::HandlerWrapper;
158 : out_(out), context_(context), context_fiber_local_(context->per_fiber()) {}
160 template<
typename U>
void Write(
const ShardId& shard_id, U&& u) {
161 context_->Write(shard_id, rt_.Serialize(out_.is_binary(), std::forward<U>(u)));
165 ShardId shard_id = out_.Shard(t);
170 ShardId shard_id = out_.Shard(t);
171 Write(shard_id, std::move(t));
174 RawContext* raw() {
return context_; }
177 bool is_binary()
const {
return context_fiber_local_->is_binary; }
178 const std::string& input_file_name()
const {
return context_fiber_local_->file_name; }
180 size_t input_pos()
const {
return context_fiber_local_->input_pos; }
183 void SetOutputShard(ShardId sid) {
184 detail::VerifyUnspecifiedSharding(out_.msg());
185 out_.SetConstantShard(std::move(sid));
188 void CloseShard(
const ShardId& sid) { raw()->CloseShard(sid); }
193 RawContext* context_;
194 const RawContext::PerFiber* context_fiber_local_;
void TEST_Write(const ShardId &shard_id, std::string &&record)
Used only in tests.
absl::variant< absl::monostate, int64_t, std::string > InputMetaData
std/absl monostate is an empty class that gives variant optional semantics.
void IncBy(StringPiece name, long delta)
MR metrics - are used for monitoring, exposing statistics via http.
Base class for operator executors.
void UpdateMetricMap(MetricMap *metric_map)