6 #include <boost/fiber/mutex.hpp>     9 #include "absl/container/flat_hash_map.h"    17 class OperatorExecutor;
    23     : 
PTable<T>(std::move(impl)), input_(ib) {}
    26   PInput<T>& set_skip_header(
unsigned num_records) {
    27     input_->mutable_msg()->set_skip_header(num_records);
    43   friend class detail::TableBase;
    52     std::vector<pb::Input::FileSpec> file_spec_;
    55     InputSpec(
const std::vector<std::string>& globs);
    57     InputSpec(std::vector<pb::Input::FileSpec> globs) : file_spec_{std::move(globs)} {}
    59     const std::vector<pb::Input::FileSpec>& file_spec()
 const { 
return file_spec_; }
    63     return Read(name, pb::WireFormat::TXT, input_spec);
    67     return ReadText(name, InputSpec{glob});
    70   PInput<std::string> ReadLst(
const std::string& name, 
const InputSpec& input_spec) {
    71     return Read(name, pb::WireFormat::LST, input_spec);
    74   PInput<std::string> ReadLst(
const std::string& name, 
const std::vector<std::string>& globs) {
    75     return Read(name, pb::WireFormat::LST, globs);
    78   PInput<std::string> ReadLst(
const std::string& name, 
const std::string& glob) {
    79     return ReadLst(name, std::vector<std::string>{glob});
    88   bool Run(Runner* runner);
    93   template <
typename GrouperType, 
typename Out, 
typename... Args>
    94   PTable<Out> Join(
const std::string& name,
    95                    std::initializer_list<detail::HandlerBinding<GrouperType, Out>> mapper_bindings,
    98   pb::Input* mutable_input(
const std::string&);
   101   const FrequencyMap<T>* GetFreqMap(
const std::string& map_id)
 const {
   102     auto it = freq_maps_.find(map_id);
   103     if (it == freq_maps_.end())
   105     return &it->second.Cast<T>();
   108   PInput<std::string> Read(
const std::string& name, pb::WireFormat::Type format,
   109                            const InputSpec& globs);
   111   const InputBase* CheckedInput(
const std::string& name) 
const;
   112   void ProcessTable(detail::TableBase* tbl);
   115   absl::flat_hash_map<std::string, std::unique_ptr<InputBase>> inputs_;
   116   std::vector<std::shared_ptr<detail::TableBase>> tables_;
   118   ::boost::fibers::mutex mu_;
   119   std::shared_ptr<OperatorExecutor> executor_;  
   120   std::atomic_bool stopped_{
false};
   122   RawContext::FreqMapRegistry freq_maps_;
   123   std::map<std::string, MetricMap> metric_maps_;
   126  template <
typename GrouperType, 
typename OutT, 
typename... Args>
   127 PTable<OutT> Pipeline::Join
   128     (
const std::string& name,
   129      std::initializer_list<detail::HandlerBinding<GrouperType, OutT>> mapper_bindings,
   131   auto res = detail::TableImplT<OutT>::template AsGroup<GrouperType>(name, mapper_bindings, 
this,
   133   return PTable<OutT>{res};
   136 template <
typename U, 
typename Joiner, 
typename Out, 
typename S>
   137 detail::HandlerBinding<Joiner, Out> JoinInput(
const PTable<U>& tbl,
   138                                               EmitMemberFn<S, Joiner, Out> ptr) {
   139   return tbl.BindWith(ptr);
 
void Stop()
Stops/breaks the run.
Serves as the main entry point for setting up and configuring the mapreduce pipeline.
A pool of IoContext objects, representing and managing CPU resources of the system.
bool Run(Runner *runner)
Runs the pipeline and blocks the current thread.