pipeline.h
1 // Copyright 2019, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 #pragma once
5 
6 #include <boost/fiber/mutex.hpp>
7 #include "mr/ptable.h"
8 
9 #include "absl/container/flat_hash_map.h"
10 
11 namespace util {
12 class IoContextPool;
13 } // namespace util
14 
15 namespace mr3 {
16 class Runner;
17 class OperatorExecutor;
18 
19 template <typename T> class PInput : public PTable<T> {
20  friend class Pipeline;
21 
22  PInput(std::shared_ptr<detail::TableImplT<T>> impl, InputBase* ib)
23  : PTable<T>(std::move(impl)), input_(ib) {}
24 
25  public:
26  PInput<T>& set_skip_header(unsigned num_records) {
27  input_->mutable_msg()->set_skip_header(num_records);
28  return *this;
29  }
30 
31  private:
32  InputBase* input_;
33 };
34 
35 
42 class Pipeline {
43  friend class detail::TableBase;
44 
45  public:
46  explicit Pipeline(util::IoContextPool* pool);
47  ~Pipeline();
48 
51  class InputSpec {
52  std::vector<pb::Input::FileSpec> file_spec_;
53 
54  public:
55  InputSpec(const std::vector<std::string>& globs);
56  InputSpec(const std::string& glob) : InputSpec(std::vector<std::string>{glob}) {}
57  InputSpec(std::vector<pb::Input::FileSpec> globs) : file_spec_{std::move(globs)} {}
58 
59  const std::vector<pb::Input::FileSpec>& file_spec() const { return file_spec_; }
60  };
61 
62  PInput<std::string> ReadText(const std::string& name, const InputSpec& input_spec) {
63  return Read(name, pb::WireFormat::TXT, input_spec);
64  }
65 
66  PInput<std::string> ReadText(const std::string& name, const std::string& glob) {
67  return ReadText(name, InputSpec{glob});
68  }
69 
70  PInput<std::string> ReadLst(const std::string& name, const InputSpec& input_spec) {
71  return Read(name, pb::WireFormat::LST, input_spec);
72  }
73 
74  PInput<std::string> ReadLst(const std::string& name, const std::vector<std::string>& globs) {
75  return Read(name, pb::WireFormat::LST, globs);
76  }
77 
78  PInput<std::string> ReadLst(const std::string& name, const std::string& glob) {
79  return ReadLst(name, std::vector<std::string>{glob});
80  }
81 
88  bool Run(Runner* runner);
89 
91  void Stop();
92 
93  template <typename GrouperType, typename Out, typename... Args>
94  PTable<Out> Join(const std::string& name,
95  std::initializer_list<detail::HandlerBinding<GrouperType, Out>> mapper_bindings,
96  Args&&... args);
97 
98  pb::Input* mutable_input(const std::string&);
99 
100  template <class T>
101  const FrequencyMap<T>* GetFreqMap(const std::string& map_id) const {
102  auto it = freq_maps_.find(map_id);
103  if (it == freq_maps_.end())
104  return nullptr;
105  return &it->second.Cast<T>();
106  }
107  private:
108  PInput<std::string> Read(const std::string& name, pb::WireFormat::Type format,
109  const InputSpec& globs);
110 
111  const InputBase* CheckedInput(const std::string& name) const;
112  void ProcessTable(detail::TableBase* tbl);
113 
114  util::IoContextPool* pool_;
115  absl::flat_hash_map<std::string, std::unique_ptr<InputBase>> inputs_;
116  std::vector<std::shared_ptr<detail::TableBase>> tables_;
117 
118  ::boost::fibers::mutex mu_;
119  std::shared_ptr<OperatorExecutor> executor_; // guarded by mu_
120  std::atomic_bool stopped_{false};
121 
122  RawContext::FreqMapRegistry freq_maps_;
123  std::map<std::string, MetricMap> metric_maps_;
124 };
125 
126  template <typename GrouperType, typename OutT, typename... Args>
127 PTable<OutT> Pipeline::Join
128  (const std::string& name,
129  std::initializer_list<detail::HandlerBinding<GrouperType, OutT>> mapper_bindings,
130  Args&&... args) {
131  auto res = detail::TableImplT<OutT>::template AsGroup<GrouperType>(name, mapper_bindings, this,
132  args...);
133  return PTable<OutT>{res};
134 }
135 
136 template <typename U, typename Joiner, typename Out, typename S>
137 detail::HandlerBinding<Joiner, Out> JoinInput(const PTable<U>& tbl,
138  EmitMemberFn<S, Joiner, Out> ptr) {
139  return tbl.BindWith(ptr);
140 }
141 
142 } // namespace mr3
Convenience class that represents multiple types that can be treated as input globs.
Definition: pipeline.h:51
void Stop()
Stops/breaks the run.
Serves as the main entry point for setting up and configuring the mapreduce pipeline.
Definition: pipeline.h:42
A pool of IoContext objects, representing and managing CPU resources of the system.
bool Run(Runner *runner)
Runs the pipeline and blocks the current thread.