yield.hpp
1 // Copyright Oliver Kowalke, Nat Goodspeed 2015.
2 // Distributed under the Boost Software License, Version 1.0.
3 // (See accompanying file LICENSE_1_0.txt or copy at
4 // http://www.boost.org/LICENSE_1_0.txt)
5 
6 #pragma once
7 
8 #include <boost/asio/async_result.hpp>
9 #include <boost/asio/detail/config.hpp>
10 #include <boost/assert.hpp>
11 #include <boost/atomic.hpp>
12 #include <boost/intrusive_ptr.hpp>
13 #include <boost/system/error_code.hpp>
14 #include <boost/system/system_error.hpp>
15 #include <boost/throw_exception.hpp>
16 
17 #include <boost/fiber/mutex.hpp>
18 
19 #include <mutex> // std::unique_lock
20 
21 namespace util {
22 namespace fibers_ext {
23 namespace detail {
24 
25 namespace fbs = ::boost::fibers;
26 
27 //[fibers_asio_yield_completion
28 // Bundle a completion bool flag with a spinlock to protect it.
30  enum state_t { init, waiting, complete };
31 
32  typedef fbs::detail::spinlock mutex_t;
33  typedef std::unique_lock<mutex_t> lock_t;
34  typedef boost::intrusive_ptr<yield_completion> ptr_t;
35 
36  std::atomic<std::size_t> use_count_{0};
37  mutex_t mtx_{};
38  state_t state_{init};
39 
40  void wait() {
41  // yield_handler_base::operator()() will set state_ `complete` and
42  // attempt to wake a suspended fiber. It would be Bad if that call
43  // happened between our detecting (complete != state_) and suspending.
44  lock_t lk{mtx_};
45  // If state_ is already set, we're done here: don't suspend.
46  if (complete != state_) {
47  state_ = waiting;
48  // suspend(unique_lock<spinlock>) unlocks the lock in the act of
49  // resuming another fiber
50  fbs::context::active()->suspend(lk);
51  }
52  }
53 
54  friend void intrusive_ptr_add_ref(yield_completion* yc) noexcept {
55  BOOST_ASSERT(nullptr != yc);
56  yc->use_count_.fetch_add(1, std::memory_order_relaxed);
57  }
58 
59  friend void intrusive_ptr_release(yield_completion* yc) noexcept {
60  BOOST_ASSERT(nullptr != yc);
61  if (1 == yc->use_count_.fetch_sub(1, std::memory_order_release)) {
62  std::atomic_thread_fence(std::memory_order_acquire);
63  delete yc;
64  }
65  }
66 };
67 //]
68 
69 //[fibers_asio_yield_handler_base
70 // This class encapsulates common elements between yield_handler<T> (capturing
71 // a value to return from asio async function) and yield_handler<void> (no
72 // such value). See yield_handler<T> and its <void> specialization below. Both
73 // yield_handler<T> and yield_handler<void> are passed by value through
74 // various layers of asio functions. In other words, they're potentially
75 // copied multiple times. So key data such as the yield_completion instance
76 // must be stored in our async_result<yield_handler<>> specialization, which
77 // should be instantiated only once.
79  public:
80  yield_handler_base(yield_t const& y)
81  : // capture the context* associated with the running fiber
82  ctx_{fbs::context::active()},
83  // capture the passed yield_t
84  yt_(y) {}
85 
86  // completion callback passing only (error_code)
87  void operator()(boost::system::error_code const& ec) {
88  BOOST_ASSERT_MSG(ycomp_,
89  "Must inject yield_completion* "
90  "before calling yield_handler_base::operator()()");
91  BOOST_ASSERT_MSG(yt_.ec_,
92  "Must inject boost::system::error_code* "
93  "before calling yield_handler_base::operator()()");
94  // If originating fiber is busy testing state_ flag, wait until it
95  // has observed (completed != state_).
96  yield_completion::lock_t lk{ycomp_->mtx_};
97  yield_completion::state_t state = ycomp_->state_;
98  // Notify a subsequent yield_completion::wait() call that it need not
99  // suspend.
100  ycomp_->state_ = yield_completion::complete;
101  // set the error_code bound by yield_t
102  *yt_.ec_ = ec;
103  // unlock the lock that protects state_
104  lk.unlock();
105  // If ctx_ is still active, e.g. because the async operation
106  // immediately called its callback (this method!) before the asio
107  // async function called async_result_base::get(), we must not set it
108  // ready.
109  if (yield_completion::waiting == state) {
110  // wake the fiber
111  fbs::context::active()->schedule(ctx_);
112  }
113  }
114 
115  void bind(yield_completion::ptr_t ptr, boost::system::error_code* ec) {
116  // if yield_t didn't bind an error_code, make yield_handler_base's
117  // error_code* point to an error_code local to this object so
118  // yield_handler_base::operator() can unconditionally store through
119  // its error_code*
120  ycomp_ = std::move(ptr);
121  if (!yt_.ec_) {
122  yt_.ec_ = ec;
123  }
124  }
125 
126  private:
127  boost::fibers::context* ctx_;
128  yield_t yt_;
129  // We depend on this pointer to yield_completion, which will be injected
130  // by async_result.
131  yield_completion::ptr_t ycomp_{};
132 };
133 //]
134 
135 //[fibers_asio_yield_handler_T
136 // asio uses handler_type<completion token type, signature>::type to decide
137 // what to instantiate as the actual handler. Below, we specialize
138 // handler_type< yield_t, ... > to indicate yield_handler<>. So when you pass
139 // an instance of yield_t as an asio completion token, asio selects
140 // yield_handler<> as the actual handler class.
141 template <typename T> class yield_handler : public yield_handler_base {
142  friend class ::boost::asio::async_result<util::fibers_ext::yield_t,
143  void(boost::system::error_code, T)>;
144 
145  public:
146  // asio passes the completion token to the handler constructor
147  explicit yield_handler(yield_t const& y) : yield_handler_base{y} {}
148 
149  // completion callback passing only value (T)
150  void operator()(T t) {
151  // just like callback passing success error_code
152  (*this)(boost::system::error_code(), std::move(t));
153  }
154 
155  // completion callback passing (error_code, T)
156  void operator()(boost::system::error_code const& ec, T t) {
157  BOOST_ASSERT_MSG(value_,
158  "Must inject value ptr "
159  "before caling yield_handler<T>::operator()()");
160  // move the value to async_result<> instance BEFORE waking up a
161  // suspended fiber
162  *value_ = std::move(t);
163  // forward the call to base-class completion handler
164  yield_handler_base::operator()(ec);
165  }
166 
167  private:
168  // pointer to destination for eventual value
169  // this must be injected by async_result before operator()() is called
170  T* value_{nullptr};
171 };
172 //]
173 
174 //[fibers_asio_yield_handler_void
175 // yield_handler<void> is like yield_handler<T> without value_. In fact it's
176 // just like yield_handler_base.
177 template <> class yield_handler<void> : public yield_handler_base {
178  public:
179  explicit yield_handler(yield_t const& y) : yield_handler_base{y} {}
180 
181  // nullary completion callback
182  void operator()() { (*this)(boost::system::error_code()); }
183 
184  // inherit operator()(error_code) overload from base class
185  using yield_handler_base::operator();
186 };
187 //]
188 
189 // Specialize asio_handler_invoke hook to ensure that any exceptions thrown
190 // from the handler are propagated back to the caller
191 // template <typename Fn, typename T> void asio_handler_invoke(Fn&& fn, yield_handler<T>*) { fn(); }
192 
193 //[fibers_asio_async_result_base
194 // Factor out commonality between async_result<yield_handler<T>> and
195 // async_result<yield_handler<void>>
197  public:
198  explicit async_result_base(yield_handler_base& h) : ycomp_{new yield_completion{}} {
199  // Inject ptr to our yield_completion instance into this
200  // yield_handler<>.
201  h.bind(ycomp_, &ec_);
202  }
203 
204  void get() {
205  // Unless yield_handler_base::operator() has already been called,
206  // suspend the calling fiber until that call.
207  ycomp_->wait();
208  // The only way our own ec_ member could have a non-default value is
209  // if our yield_handler did not have a bound error_code AND the
210  // completion callback passed a non-default error_code.
211  if (ec_) {
212  boost::throw_exception(boost::system::system_error{ec_});
213  }
214  }
215 
216  private:
217  // If yield_t does not bind an error_code instance, store into here.
218  boost::system::error_code ec_{};
219  yield_completion::ptr_t ycomp_;
220 };
221 //]
222 
223 } // namespace detail
224 } // namespace fibers_ext
225 } // namespace util
226 
227 namespace boost {
228 namespace asio {
229 
230 //[fibers_asio_async_result_T
231 // asio constructs an async_result<> instance from the yield_handler specified
232 // by handler_type<>::type. A particular asio async method constructs the
233 // yield_handler, constructs this async_result specialization from it, then
234 // returns the result of calling its get() method.
235 template <typename T>
236 class async_result<util::fibers_ext::yield_t, void(boost::system::error_code, T)>
238  public:
239  // type returned by get()
240  typedef T return_type;
242 
243  explicit async_result(completion_handler_type& h)
245  // Inject ptr to our value_ member into yield_handler<>: result will
246  // be stored here.
247  h.value_ = &value_;
248  }
249 
250  // asio async method returns result of calling get()
251  return_type get() {
252  util::fibers_ext::detail::async_result_base::get();
253  return std::move(value_);
254  }
255 
256  private:
257  return_type value_{};
258 };
259 //]
260 
261 //[fibers_asio_async_result_void
262 // Without the need to handle a passed value, our yield_handler<void>
263 // specialization is just like async_result_base.
264 template <>
265 class async_result<util::fibers_ext::yield_t, void(boost::system::error_code)>
267  public:
268  typedef void return_type;
270 
271  explicit async_result(completion_handler_type& h)
273 };
274 //]
275 
276 } // namespace asio
277 } // namespace boost