runner.h
1 // Copyright 2019, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 #pragma once
5 
6 #include "absl/container/flat_hash_map.h"
7 #include "absl/strings/string_view.h"
8 #include "mr/mr3.pb.h"
9 #include "mr/mr_types.h"
10 
11 namespace mr3 {
12 
13 // value.second (string) - is a file glob that corresponds to 1 or more files comprising the shard.
14 // i.e can be "shard-0000-*.txt.gz" but can be a single file as well.
15 // To get the exact list, call ExpandGlob() on each value.
16 using ShardFileMap = absl::flat_hash_map<ShardId, std::string>;
17 
18 class RawContext;
19 
20 class Runner {
21  public:
22  virtual ~Runner();
23 
24  virtual void Init() = 0;
25 
26  virtual void Shutdown() = 0;
27 
28  // It's guaranteed that op will live until OperatorEnd is called.
29  virtual void OperatorStart(const pb::Operator* op) = 0;
30 
31  // Must be thread-safe. Called from multiple threads in operator_executors.
32  virtual RawContext* CreateContext() = 0;
33 
34  virtual void OperatorEnd(ShardFileMap* out_files) = 0;
35 
36  using ExpandCb = std::function<void(size_t file_size, const std::string&)>;
37 
38  virtual void ExpandGlob(const std::string& glob, ExpandCb cb) = 0;
39 
40  // Read file and fill queue. This function must be fiber-friendly.
41  // Returns number of records processed.
42  virtual size_t ProcessInputFile(const std::string& filename, pb::WireFormat::Type type,
43  RawSinkCb cb) = 0;
44 
45  virtual void SaveFile(absl::string_view fn, absl::string_view data) = 0;
46 };
47 
48 } // namespace mr3