operator_executor.h
1 // Copyright 2019, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 #pragma once
5 
6 #include <boost/fiber/mutex.hpp>
7 
8 #include "mr/impl/table_impl.h"
9 #include "mr/runner.h"
10 
11 #include "util/stats/varz_value.h"
12 
13 namespace util {
14 class IoContextPool;
15 } // namespace util
16 
17 namespace mr3 {
18 class InputBase;
19 
20 
27 class OperatorExecutor : public std::enable_shared_from_this<OperatorExecutor> {
28  public:
30  : pool_(pool), runner_(runner) {}
31 
32  virtual ~OperatorExecutor() {}
33 
34  void Init(const RawContext::FreqMapRegistry& prev_maps);
35 
36  virtual void Run(const std::vector<const InputBase*>& inputs,
37  detail::TableBase* ss, ShardFileMap* out_files) = 0;
38 
39  // Stops the executor in the middle.
40  virtual void Stop() = 0;
41 
42  const RawContext::FreqMapRegistry& GetFreqMaps() const { return freq_maps_; }
43  const MetricMap& GetCounterMap() const { return metric_map_; }
44 
45 protected:
46  struct PerIoStruct {
47  unsigned index;
48  std::vector<::boost::fibers::fiber> process_fd;
49  std::unique_ptr<RawContext> raw_context;
50 
51  long *records_read_ptr = nullptr; // To avoid always looking up "fn-calls", used only by mapper.
52  bool stop_early = false; // Used only by mapper.
53 
54  PerIoStruct(unsigned i);
55 
56  void Shutdown();
57  };
58 
59  void RegisterContext(RawContext* context);
60 
62  void FinalizeContext(RawContext* context);
63 
64  util::VarzValue::Map GetStats();
65 
66  static void SetFileName(bool is_binary, const std::string& file_name, RawContext* context) {
67  context->per_fiber_->is_binary = is_binary;
68  context->per_fiber_->file_name = file_name;
69  }
70 
71  static void SetMetaData(const pb::Input::FileSpec& fs, RawContext* context);
72 
73  static void SetPosition(size_t pos, RawContext* context) {
74  context->per_fiber_->input_pos = pos;
75  }
76 
77  static void SetCurrentShard(ShardId shard, RawContext* context) {
78  context->per_fiber_->current_shard = std::move(shard);
79  }
80 
81  virtual void InitInternal() = 0;
82 
83  util::IoContextPool* pool_;
84  Runner* runner_;
85 
88  MetricMap metric_map_;
89  std::atomic<uint64_t> parse_errors_{0};
90 
91  RawContext::FreqMapRegistry freq_maps_;
92  const RawContext::FreqMapRegistry* finalized_maps_;
93 
94  static thread_local std::unique_ptr<PerIoStruct> per_io_;
95 };
96 
97 } // namespace mr3
Base class for operator executors.
void FinalizeContext(RawContext *context)
Called from all IO threads once they finished running the operator.
A pool of IoContext objects, representing and managing CPU resources of the system.