ptable.h
1 // Copyright 2019, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 #pragma once
5 
6 // I do not enable SSE42 for rapidjson because it may go out of boundaries, they assume
7 // all the inputs are aligned at the end.
8 //
9 // #define RAPIDJSON_SSE42
10 
11 #include <rapidjson/document.h>
12 #include <rapidjson/stringbuffer.h>
13 
14 #include "base/type_traits.h"
15 #include "mr/do_context.h"
16 #include "mr/impl/table_impl.h"
17 #include "mr/mr_types.h"
18 #include "mr/output.h"
19 
20 namespace mr3 {
21 
22 class Pipeline;
23 
24 namespace detail {
25 
26 template <typename MapperType, typename = void> struct MapperTraits {
27  static_assert(sizeof(MapperType) == 0,
28  "Must have member function Do(InputType inp, DoContext<OutputType>* context)");
29 };
30 
31 template <typename MapperType>
32 struct MapperTraits<MapperType, ::base::void_t<decltype(&MapperType::Do)>>
33  : public EmitFuncTraits<decltype(&MapperType::Do)> {};
34 
35 } // namespace detail
36 
37 // Planning interfaces.
38 class InputBase {
39  public:
40  InputBase(const InputBase&) = delete;
41 
42  InputBase(const std::string& name, pb::WireFormat::Type type,
43  const pb::Output* linked_outp = nullptr)
44  : linked_outp_(linked_outp) {
45  input_.set_name(name);
46  input_.mutable_format()->set_type(type);
47  }
48 
49  void operator=(const InputBase&) = delete;
50 
51  pb::Input* mutable_msg() { return &input_; }
52  const pb::Input& msg() const { return input_; }
53 
54  const pb::Output* linked_outp() const { return linked_outp_; }
55 
56  protected:
57  const pb::Output* linked_outp_;
58  pb::Input input_;
59 };
60 
61 template <typename OutT> class PTable {
62  friend class Pipeline;
63 
64  // apparently template classes of different type can not access own private members.
65  template <typename T> friend class PTable;
66 
67  public:
68  PTable() {}
69  ~PTable() {}
70 
71  Output<OutT>& Write(const std::string& name, pb::WireFormat::Type type) {
72  return impl_->Write(name, type);
73  }
74 
75  template <typename MapType, typename... Args>
77  Args&&... args) const;
78 
79  template <typename Handler, typename ToType, typename U>
80  detail::HandlerBinding<Handler, ToType> BindWith(EmitMemberFn<U, Handler, ToType> ptr) const {
81  return impl_->BindWith(ptr);
82  }
83 
84  template <typename U> PTable<U> As() const { return PTable<U>{impl_->template Rebind<U>()}; }
85 
86  PTable<rapidjson::Document> AsJson() const { return As<rapidjson::Document>(); }
87 
88  protected:
89  using TableImpl = detail::TableImplT<OutT>;
90 
91  explicit PTable(std::shared_ptr<TableImpl> impl) : impl_(std::move(impl)) {}
92 
93  std::shared_ptr<TableImpl> impl_;
94 };
95 
97 
98 template <typename OutT>
99 template <typename MapType, typename... Args>
101  const std::string& name, Args&&... args) const {
102  using mapper_traits_t = detail::MapperTraits<MapType>;
103  using NewOutType = typename mapper_traits_t::OutputType;
104 
105  static_assert(std::is_constructible<typename mapper_traits_t::first_arg_t, OutT&&>::value,
106  "MapperType::Do() first argument "
107  "should be constructed from PTable element type");
108 
109  auto res = detail::TableImplT<NewOutType>::template AsMapFrom<MapType>(
110  name, impl_.get(), std::forward<Args>(args)...);
111  return PTable<NewOutType>{std::move(res)};
112 }
113 
114 template <> class RecordTraits<rapidjson::Document> {
115  std::string tmp_;
116  rapidjson::StringBuffer sb_; // Used by serialize.
117 
118  public:
119  RecordTraits(const RecordTraits& r) {} // we do not copy temporary fields.
120  RecordTraits() {}
121 
122  std::string Serialize(bool is_binary, const rapidjson::Document& doc);
123  bool Parse(bool is_binary, std::string&& tmp, rapidjson::Document* res);
124 };
125 
126 } // namespace mr3
Serves as the main entry point for setting up and configuring the mapreduce pipeline.
Definition: pipeline.h:42