6 #include <boost/fiber/mutex.hpp> 9 #include "absl/container/flat_hash_map.h" 17 class OperatorExecutor;
23 :
PTable<T>(std::move(impl)), input_(ib) {}
26 PInput<T>& set_skip_header(
unsigned num_records) {
27 input_->mutable_msg()->set_skip_header(num_records);
43 friend class detail::TableBase;
52 std::vector<pb::Input::FileSpec> file_spec_;
55 InputSpec(
const std::vector<std::string>& globs);
57 InputSpec(std::vector<pb::Input::FileSpec> globs) : file_spec_{std::move(globs)} {}
59 const std::vector<pb::Input::FileSpec>& file_spec()
const {
return file_spec_; }
63 return Read(name, pb::WireFormat::TXT, input_spec);
67 return ReadText(name, InputSpec{glob});
70 PInput<std::string> ReadLst(
const std::string& name,
const InputSpec& input_spec) {
71 return Read(name, pb::WireFormat::LST, input_spec);
74 PInput<std::string> ReadLst(
const std::string& name,
const std::vector<std::string>& globs) {
75 return Read(name, pb::WireFormat::LST, globs);
78 PInput<std::string> ReadLst(
const std::string& name,
const std::string& glob) {
79 return ReadLst(name, std::vector<std::string>{glob});
88 bool Run(Runner* runner);
93 template <
typename GrouperType,
typename Out,
typename... Args>
94 PTable<Out> Join(
const std::string& name,
95 std::initializer_list<detail::HandlerBinding<GrouperType, Out>> mapper_bindings,
98 pb::Input* mutable_input(
const std::string&);
101 const FrequencyMap<T>* GetFreqMap(
const std::string& map_id)
const {
102 auto it = freq_maps_.find(map_id);
103 if (it == freq_maps_.end())
105 return &it->second.Cast<T>();
108 PInput<std::string> Read(
const std::string& name, pb::WireFormat::Type format,
109 const InputSpec& globs);
111 const InputBase* CheckedInput(
const std::string& name)
const;
112 void ProcessTable(detail::TableBase* tbl);
115 absl::flat_hash_map<std::string, std::unique_ptr<InputBase>> inputs_;
116 std::vector<std::shared_ptr<detail::TableBase>> tables_;
118 ::boost::fibers::mutex mu_;
119 std::shared_ptr<OperatorExecutor> executor_;
120 std::atomic_bool stopped_{
false};
122 RawContext::FreqMapRegistry freq_maps_;
123 std::map<std::string, MetricMap> metric_maps_;
126 template <
typename GrouperType,
typename OutT,
typename... Args>
127 PTable<OutT> Pipeline::Join
128 (
const std::string& name,
129 std::initializer_list<detail::HandlerBinding<GrouperType, OutT>> mapper_bindings,
131 auto res = detail::TableImplT<OutT>::template AsGroup<GrouperType>(name, mapper_bindings,
this,
133 return PTable<OutT>{res};
136 template <
typename U,
typename Joiner,
typename Out,
typename S>
137 detail::HandlerBinding<Joiner, Out> JoinInput(
const PTable<U>& tbl,
138 EmitMemberFn<S, Joiner, Out> ptr) {
139 return tbl.BindWith(ptr);
void Stop()
Stops/breaks the run.
Serves as the main entry point for setting up and configuring the mapreduce pipeline.
A pool of IoContext objects, representing and managing CPU resources of the system.
bool Run(Runner *runner)
Runs the pipeline and blocks the current thread.