mapper_executor.h
1 // Copyright 2019, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 #pragma once
5 
6 #include <boost/fiber/buffered_channel.hpp>
7 #include <functional>
8 
9 #include "mr/operator_executor.h"
10 #include "util/fibers/simple_channel.h"
11 
12 namespace mr3 {
13 
15  struct FileInput {
16  const pb::Input* input;
17  size_t spec_index;
18  size_t file_size;
19  ::std::string file_name;
20  };
21  using FileNameQueue = ::boost::fibers::buffered_channel<FileInput>;
22 
23  struct Record {
24  enum Operand { UNDEFINED, BINARY_FORMAT, TEXT_FORMAT, METADATA, RECORD} op = UNDEFINED;
25 
26  // either file spec or <pos,record> pair.
27  absl::variant<const pb::Input::FileSpec*, ::std::pair<size_t, ::std::string>> payload;
28 
29  Record() = default;
30 
31  Record(Operand op2, size_t pos, ::std::string val)
32  : op(op2), payload(::std::pair<size_t, ::std::string>{pos, ::std::move(val)}) {}
33 
34  Record(Operand op2, const pb::Input::FileSpec* fspec)
35  : op(op2), payload(fspec) {}
36  };
37 
39 
40  public:
42  ~MapperExecutor();
43 
44  void Run(const std::vector<const InputBase*>& inputs, detail::TableBase* ss,
45  ShardFileMap* out_files) final;
46 
47  // Stops the executor in the middle.
48  void Stop() final;
49 
50  private:
51  void InitInternal() final;
52 
53  void PushInput(const InputBase*);
54 
55  // Input managing fiber that reads files from disk and pumps data into record_q.
56  // One per IO thread.
57  void IOReadFiber(detail::TableBase* tb);
58 
59  // index - io thread index.
60  void SetupPerIoThread(unsigned index, detail::TableBase* tb);
61 
62  static void MapFiber(RecordQueue* record_q, detail::TableBase* tb);
63 
64  std::unique_ptr<FileNameQueue> file_name_q_;
65 };
66 
67 } // namespace mr3
Base class for operator executors.
Single producer - single consumer thread-safe, fiber-friendly channel.
A pool of IoContext objects, representing and managing CPU resources of the system.