6 #include <boost/fiber/fss.hpp>     9 #include "absl/container/flat_hash_map.h"    11 #include "mr/impl/freq_map_wrapper.h"    12 #include "mr/mr_types.h"    13 #include "mr/output.h"    14 #include "strings/unique_strings.h"    22 template <
typename Handler, 
typename ToType> 
class HandlerWrapper;
    24 void VerifyUnspecifiedSharding(
const pb::Output& outp);
    34   static_assert(
sizeof(base::void_t<Record>) == 0, 
"Please specify RecordTraits<> for this type");
    38   static std::string Serialize(
bool is_binary, std::string&& r) { 
return std::move(r); }
    40   static bool Parse(
bool is_binary, std::string&& tmp, std::string* res) {
    41     *res = std::move(tmp);
    52   template <
typename T> 
friend class DoContext;
    56   using InputMetaData = absl::variant<absl::monostate, int64_t, std::string>;
    57   using FreqMapRegistry =
    58     absl::flat_hash_map<std::string, detail::FreqMapWrapper>;
    67   virtual void CloseShard(
const ShardId& sid) = 0;
    70   void IncBy(StringPiece name, 
long delta) { metric_map_[name] += delta; }
    71   void Inc(StringPiece name) { 
IncBy(name, 1); }
    72   StringPieceDenseMap<long>& metric_map() { 
return metric_map_; }
    77     for (
const auto& k_v : metric_map_)
    78       (*metric_map)[std::string(k_v.first)] += k_v.second;
    83     Write(shard_id, std::move(record));
    86   void EmitParseError() { ++metric_map_[
"parse-errors"]; }
    89   FrequencyMap<T>&  GetFreqMapStatistic(
const std::string& map_id) {
    90     auto res = freq_maps_.emplace(map_id, detail::FreqMapWrapper());
    92       res.first->second = detail::FreqMapWrapper(FrequencyMap<T>());
    94     return res.first->second.Cast<T>();
    99   const FrequencyMap<T>* FindMaterializedFreqMapStatistic(
   100       const std::string& map_id)
 const {
   101     const detail::FreqMapWrapper *ptr = FindMaterializedFreqMapStatisticImpl(map_id);
   102     return &ptr->Cast<T>();
   107     std::string file_name;
   110     size_t input_pos = 0;
   111     bool is_binary = 
false;
   117     return per_fiber_.get();
   121   void Write(
const ShardId& shard_id, std::string&& record) {
   122     ++metric_map_[
"fn-writes"];
   123     WriteInternal(shard_id, std::move(record));
   126   const detail::FreqMapWrapper *FindMaterializedFreqMapStatisticImpl(
const std::string&) 
const;
   129   virtual void WriteInternal(
const ShardId& shard_id, std::string&& record) = 0;
   131   ::boost::fibers::fiber_specific_ptr<PerFiber> per_fiber_;
   133   StringPieceDenseMap<long> metric_map_;
   134   FreqMapRegistry freq_maps_;
   135   const FreqMapRegistry* finalized_maps_ = 
nullptr;
   143   const FrequencyMap<T>* FindMaterializedFreqMapStatistic(
const std::string& map_id)
 const {
   144     return raw_->FindMaterializedFreqMapStatistic<T>(map_id);
   154   template <
typename Handler, 
typename ToType> 
friend class detail::HandlerWrapper;
   158       : out_(out), context_(context), context_fiber_local_(context->per_fiber()) {}
   160   template<
typename U> 
void Write(
const ShardId& shard_id, U&& u) {
   161     context_->Write(shard_id, rt_.Serialize(out_.is_binary(), std::forward<U>(u)));
   165     ShardId shard_id = out_.Shard(t);
   170     ShardId shard_id = out_.Shard(t);
   171     Write(shard_id, std::move(t));
   174   RawContext* raw() { 
return context_; }
   177   bool is_binary()
 const { 
return context_fiber_local_->is_binary; }
   178   const std::string& input_file_name()
 const { 
return context_fiber_local_->file_name; }
   180   size_t input_pos()
 const { 
return context_fiber_local_->input_pos; }
   183   void SetOutputShard(ShardId sid) {
   184     detail::VerifyUnspecifiedSharding(out_.msg());
   185     out_.SetConstantShard(std::move(sid));
   188   void CloseShard(
const ShardId& sid) { raw()->CloseShard(sid); }
   193   RawContext* context_;
   194   const RawContext::PerFiber* context_fiber_local_;
 void TEST_Write(const ShardId &shard_id, std::string &&record)
Used only in tests.
 
absl::variant< absl::monostate, int64_t, std::string > InputMetaData
std/absl monostate is an empty class that gives variant optional semantics.
 
void IncBy(StringPiece name, long delta)
MR metrics - are used for monitoring, exposing statistics via http.
 
Base class for operator executors.
 
void UpdateMetricMap(MetricMap *metric_map)