periodic_task.h
1 // Copyright 2018, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 #pragma once
5 
6 #include <thread>
7 #include <boost/asio/steady_timer.hpp>
8 #include <boost/fiber/condition_variable.hpp>
9 #include "util/asio/io_context.h"
10 
11 namespace util {
12 
13 // Single threaded but fiber friendly PeriodicTask. Runs directly from IO fiber therefore
14 // should run only cpu, non-blocking tasks which should not block the calling fiber.
15 // 'Cancel' may block the calling fiber until the scheduled callback finished running.
16 class PeriodicTask {
17  enum {ALARMED = 0x1, SHUTDOWN = 0x2};
18 
19  public:
20  using timer_t = ::boost::asio::steady_timer;
21  using duration_t = timer_t::duration;
22  using error_code = boost::system::error_code;
23 
24  PeriodicTask(IoContext& cntx, duration_t d) : timer_(cntx.raw_context()), d_(d), state_(0) {}
25  PeriodicTask(PeriodicTask&&) = default;
26 
27  ~PeriodicTask() { Cancel(); }
28 
29  // f must be non-blocking function because it runs directly in IO fiber.
30  // f should accept 'ticks' argument that says by how many duration cycles we are late.
31  // Usually ticks=1.
32  // For example if we started at time 0, with duration=1s and on n-th invocation we measured
33  // "n+1" seconds, then f(2) will be called. The system is self-balancing so for next invocation
34  // at (n+2) seconds, we will pass f(1) again.
35  template<typename Func> void Start(Func&& f) {
36  Alarm();
37  RunInternal(std::forward<Func>(f));
38  }
39 
40  // Cancels the task and blocks until all the callbacks finished to run.
41  // Since it blocks - it should not run from IO fiber.
42  void Cancel();
43 
44  private:
45  void Alarm();
46 
47  template<typename Func> void RunInternal(Func&& f) {
48  timer_.async_wait([this, f = std::forward<Func>(f)] (const error_code& ec) mutable {
49  if (ec == boost::asio::error::operation_aborted || (state_ & SHUTDOWN)) {
50  Disalarm();
51  return;
52  }
53  duration_t real_d = timer_t::clock_type::now() - last_;
54 
55  // for each function invocation we pass at least 1 tick.
56  int ticks = std::max<int>(1, real_d / d_);
57  f(ticks);
58  last_ += d_ * ticks;
59 
60  // due to max() rounding, last_ will self balance itself to be close to clock_type::now().
61  timer_.expires_at(last_ + d_);
62  RunInternal(std::forward<Func>(f));
63  });
64  }
65 
66  void Disalarm() {
67  state_ &= ~uint8_t(ALARMED);
68  }
69 
70  timer_t timer_;
71  duration_t d_;
72  timer_t::time_point last_;
73  uint8_t state_ ;
74 };
75 
76 
77 // Each tasks runs in a new thread, thus not blocking the IO fiber. The next invocation of the
78 // task will skip the run if the previous has finished.
80  public:
81  struct Options {
82  std::string name;
83 
84  // how many times this task can be skipped before reaching error state.
85  // Provide kuint32max number for allowing infinite number of skips.
86  // By default we do not allow skipping tasks.
87  uint32_t skip_run_margin;
88 
89  Options() : skip_run_margin(0) {}
90  };
91 
92  PeriodicWorkerTask(IoContext& cntx, PeriodicTask::duration_t d,
93  const Options& opts = Options{}) : pt_(cntx, d), opts_(opts) {}
94 
95  ~PeriodicWorkerTask() { Cancel(); }
96 
97  template<typename Func> void Start(Func&& f) {
98  pt_.Start([this, f = PackagedTask(std::forward<Func>(f))] (int ticks) {
99  if (AllowRunning()) {
100  std::thread(f).detach();
101  } else {
102  HandleSkipRun();
103  }
104  });
105  }
106 
107  void Cancel();
108  bool IsHanging() const { return is_hanging_; }
109 
110  private:
111  template<typename Func> auto PackagedTask(Func&& f) {
112  return [this, f = std::forward<Func>(f)]() {
113  ResetErrorState();
114  f();
115  Epilog();
116  };
117  }
118 
119  // Enters running state if possible, returns if succeeded.
120  bool AllowRunning() {
121  bool val = false;
122  return is_running_.compare_exchange_strong(val, true);
123  }
124 
125  void ResetErrorState();
126  void Epilog();
127  void HandleSkipRun();
128 
129  std::atomic_bool is_running_{false};
130 
131  ::boost::fibers::mutex m_;
132  ::boost::fibers::condition_variable_any cond_;
133  PeriodicTask pt_;
134  Options opts_;
135 
136  bool is_hanging_ = false;
137  unsigned number_skips_ = 0;
138 };
139 
140 } // namespace util
141