10 #include <absl/types/optional.h> 11 #include <boost/asio/executor_work_guard.hpp> 12 #include <boost/fiber/fiber.hpp> 14 #include "base/type_traits.h" 15 #include "util/asio/io_context.h" 16 #include "util/fibers/fibers_ext.h" 27 template <
typename Func,
typename... Args>
28 using AcceptArgsCheck =
29 typename std::enable_if<base::is_invocable<Func, Args...>::value,
int>::type;
32 using io_context = ::boost::asio::io_context;
39 explicit IoContextPool(std::size_t pool_size = 0, std::vector<size_t> cpus = {});
56 IoContext& operator[](
size_t i) {
return context_arr_[i]; }
57 IoContext& at(
size_t i) {
return context_arr_[i]; }
59 size_t size()
const {
return context_arr_.size(); }
67 template <
typename Func, AcceptArgsCheck<Func, IoContext&> = 0>
void AsyncOnAll(Func&& func) {
69 for (
unsigned i = 0; i < size(); ++i) {
73 context.Async([&context, func] ()
mutable { func(context); });
83 template <
typename Func, AcceptArgsCheck<Func,
unsigned, IoContext&> = 0>
86 for (
unsigned i = 0; i < size(); ++i) {
89 context.Async([&context, i, func] ()
mutable { func(i, context); });
99 template <
typename Func, AcceptArgsCheck<Func, IoContext&> = 0>
void AwaitOnAll(Func&& func) {
101 auto cb = [func = std::forward<Func>(func), bc](
IoContext& context)
mutable {
114 template <
typename Func, AcceptArgsCheck<Func,
unsigned, IoContext&> = 0>
117 auto cb = [func = std::forward<Func>(func), bc](
unsigned index,
IoContext& context)
mutable {
118 func(index, context);
133 template <
typename Func, AcceptArgsCheck<Func,
unsigned, IoContext&> = 0>
void AsyncFiberOnAll(Func&& func) {
135 ::boost::fibers::fiber(func, i, std::ref(context)).detach();
147 template <
typename Func, AcceptArgsCheck<Func, IoContext&> = 0>
void AsyncFiberOnAll(Func&& func) {
149 ::boost::fibers::fiber(func, std::ref(context)).detach();
161 template <
typename Func, AcceptArgsCheck<Func,
unsigned, IoContext&> = 0>
164 auto cb = [func = std::forward<Func>(func), bc](
unsigned i,
IoContext& context)
mutable {
180 template <
typename Func, AcceptArgsCheck<Func, IoContext&> = 0>
void AwaitFiberOnAll(Func&& func) {
182 auto cb = [func = std::forward<Func>(func), bc](
IoContext& context)
mutable {
199 boost::fibers::mutex mu;
201 std::lock_guard<boost::fibers::mutex> lk(mu);
202 func(std::forward<decltype(args)>(args));
210 void CheckRunningState();
212 typedef ::boost::asio::executor_work_guard<IoContext::io_context::executor_type> work_guard_t;
214 std::vector<size_t> cpu_idx_arr_;
215 std::vector<IoContext> context_arr_;
218 absl::optional<work_guard_t> work;
221 std::vector<TInfo> thread_arr_;
224 std::atomic_uint_fast32_t next_io_context_{0};
225 thread_local
static size_t context_indx_;
226 enum State { STOPPED, RUN } state_ = STOPPED;
void AsyncFiberOnAll(Func &&func)
Runs func in a fiber asynchronously. func must accept IoContext&. func may fiber-block.
void AwaitOnAll(Func &&func)
Runs the funcion in all IO threads asynchronously. Blocks until all the asynchronous calls return.
void AwaitFiberOnAll(Func &&func)
Runs func wrapped in fiber on all IO threads in parallel. func must accept IoContext&....
void Stop()
Stops all io_context objects in the pool.
IoContext & GetNextContext()
Get an io_context to use. Thread-safe.
void AwaitFiberOnAllSerially(Func &&func)
Runs func wrapped in fiber on all IO threads, SERIALLY. func must accept IoContext&....
void Run()
Starts running all IoContext objects in the pool. Does not block.
void AsyncOnAll(Func &&func)
Runs func in all IO threads asynchronously.
A pool of IoContext objects, representing and managing CPU resources of the system.