7 #include <boost/asio/io_context.hpp> 8 #include <boost/asio/post.hpp> 12 #include "util/fibers/fibers_ext.h" 16 class IoFiberPropertiesMgr;
22 constexpr
static unsigned MAX_NICE_LEVEL = 4;
23 constexpr
static unsigned NUM_NICE_LEVELS = MAX_NICE_LEVEL + 1;
25 IoFiberProperties(::boost::fibers::context* ctx) : fiber_properties(ctx), nice_(2) {}
27 unsigned nice_level()
const {
return nice_; }
34 void SetNiceLevel(
unsigned p);
36 void set_name(std::string nm) { name_ = std::move(nm); }
38 const std::string& name()
const {
return name_; }
44 uint64_t awaken_ts()
const {
return awaken_ts_; }
48 uint64_t resume_ts_ = 0, awaken_ts_ = 0;
56 template <
typename Func>
void Async(::boost::asio::io_context& cntx, Func&& f) {
57 ::boost::asio::post(cntx, std::forward<Func>(f));
62 template <
typename Func>
63 auto Await(::boost::asio::io_context& cntx, Func&& f) -> decltype(f()) {
65 fibers_ext::Done done;
66 using ResultType = decltype(f());
67 detail::ResultMover<ResultType> mover;
70 Async(cntx, [&, f = std::forward<Func>(f), done]()
mutable {
76 return std::move(mover).get();
85 using io_context = ::boost::asio::io_context;
91 virtual void Run() = 0;
92 virtual void Cancel() = 0;
95 IoContext() : context_ptr_(std::make_shared<io_context>()) {}
98 typedef std::shared_ptr<io_context> ptr_t;
102 io_context& raw_context() {
return *context_ptr_; }
104 template <
typename Func>
void Async(Func&& f) {
105 asio_ext::Async(*context_ptr_, std::forward<Func>(f));
108 template <
typename Func,
typename... Args>
void AsyncFiber(Func&& f, Args&&... args) {
112 Async([f = std::forward<Func>(f), args...]()
mutable {
113 ::boost::fibers::fiber(std::forward<Func>(f), std::forward<Args>(args)...).detach();
119 template <
typename Func>
auto Await(Func&& f) -> decltype(f()) {
120 if (InContextThread()) {
123 return asio_ext::Await(*context_ptr_, std::forward<Func>(f));
128 template <
typename... Args> boost::fibers::fiber LaunchFiber(Args&&... args) {
129 ::boost::fibers::fiber fb;
131 Await([&] { fb = boost::fibers::fiber(std::forward<Args>(args)...); });
140 template <
typename Func>
auto AwaitSafe(Func&& f) -> decltype(f()) {
141 if (InContextThread()) {
145 using ResultType = decltype(f());
146 detail::ResultMover<ResultType> mover;
147 auto fb = LaunchFiber([&] { mover.Apply(std::forward<Func>(f)); });
150 return std::move(mover).get();
153 auto get_executor() {
return context_ptr_->get_executor(); }
155 bool InContextThread()
const {
return std::this_thread::get_id() == thread_id_; }
162 void AttachCancellable(Cancellable* obj) {
163 auto fb = LaunchFiber([obj] { obj->Run(); });
164 cancellable_arr_.emplace_back(
165 CancellablePair{std::unique_ptr<Cancellable>(obj), std::move(fb)});
169 void StartLoop(fibers_ext::BlockingCounter* bc);
171 using CancellablePair = std::pair<std::unique_ptr<Cancellable>, ::boost::fibers::fiber>;
174 std::thread::id thread_id_;
175 std::vector<CancellablePair> cancellable_arr_;
A pool of IoContext objects, representing and managing CPU resources of the system.
uint64_t resume_ts() const