6 #include <boost/fiber/mutex.hpp> 8 #include "mr/impl/table_impl.h" 11 #include "util/stats/varz_value.h" 30 : pool_(pool), runner_(runner) {}
34 void Init(
const RawContext::FreqMapRegistry& prev_maps);
36 virtual void Run(
const std::vector<const InputBase*>& inputs,
37 detail::TableBase* ss, ShardFileMap* out_files) = 0;
40 virtual void Stop() = 0;
42 const RawContext::FreqMapRegistry& GetFreqMaps()
const {
return freq_maps_; }
43 const MetricMap& GetCounterMap()
const {
return metric_map_; }
48 std::vector<::boost::fibers::fiber> process_fd;
49 std::unique_ptr<RawContext> raw_context;
51 long *records_read_ptr =
nullptr;
52 bool stop_early =
false;
64 util::VarzValue::Map GetStats();
66 static void SetFileName(
bool is_binary,
const std::string& file_name,
RawContext* context) {
67 context->per_fiber_->is_binary = is_binary;
68 context->per_fiber_->file_name = file_name;
71 static void SetMetaData(
const pb::Input::FileSpec& fs,
RawContext* context);
73 static void SetPosition(
size_t pos,
RawContext* context) {
74 context->per_fiber_->input_pos = pos;
77 static void SetCurrentShard(ShardId shard, RawContext* context) {
78 context->per_fiber_->current_shard = std::move(shard);
81 virtual void InitInternal() = 0;
89 std::atomic<uint64_t> parse_errors_{0};
91 RawContext::FreqMapRegistry freq_maps_;
92 const RawContext::FreqMapRegistry* finalized_maps_;
94 static thread_local std::unique_ptr<PerIoStruct> per_io_;
Base class for operator executors.
void FinalizeContext(RawContext *context)
Called from all IO threads once they finished running the operator.
A pool of IoContext objects, representing and managing CPU resources of the system.