4 #include "util/asio/accept_server.h" 6 #include <boost/fiber/mutex.hpp> 8 #include "base/logging.h" 9 #include "base/walltime.h" 10 #include "util/asio/io_context_pool.h" 11 #include "util/asio/yield.h" 12 #include "util/fibers/fibers_ext.h" 16 using namespace boost;
20 using ListType = detail::slist<ConnectionHandler, ConnectionHandler::member_hook_t,
21 detail::constant_time_size<false>, detail::cache_last<false>>;
24 AcceptServer::ListenerWrapper::ListenerWrapper(
const endpoint& ep, IoContext* io_context,
25 ListenerInterface* si)
26 : io_context(*io_context), acceptor(io_context->raw_context(), ep.protocol()), listener(si) {
27 acceptor.set_option(asio::socket_base::reuse_address(
true));
29 system::error_code ec;
30 acceptor.bind(ep, ec);
31 CHECK(!ec) << ec <<
"/" << ec.message() <<
" for port " << ep;
34 port = acceptor.local_endpoint().port();
37 AcceptServer::AcceptServer(IoContextPool* pool)
38 : pool_(pool), signals_(pool->GetNextContext().raw_context(), SIGINT, SIGTERM), ref_bc_(1) {
41 auto non_blocking_cb = [
this](system::error_code ec,
int ) {
45 VLOG(1) <<
"Signal with ec " << ec <<
" " << ec.message();
46 for (
auto& l : listeners_) {
47 if (l.acceptor.is_open()) {
48 asio::post(l.acceptor.get_executor(), [acc = &l.acceptor]()
mutable { acc->close(); });
54 if (!ec && on_break_hook_) {
55 fibers::fiber{on_break_hook_}.detach();
61 signals_.async_wait(non_blocking_cb);
64 AcceptServer::~AcceptServer() {
69 unsigned short AcceptServer::AddListener(
unsigned short port, ListenerInterface* si) {
75 si->RegisterPool(pool_);
77 tcp::endpoint endpoint(tcp::v4(), port);
78 IoContext& io_context = pool_->GetNextContext();
79 listeners_.emplace_back(endpoint, &io_context, si);
80 auto& listener = listeners_.back();
82 LOG(INFO) <<
"AcceptServer - listening on port " << listener.port;
87 void AcceptServer::AcceptInIOThread(ListenerWrapper* wrapper) {
88 CHECK(wrapper->io_context.InContextThread());
89 auto& fiber_props = this_fiber::properties<IoFiberProperties>();
90 fiber_props.SetNiceLevel(IoFiberProperties::MAX_NICE_LEVEL - 1);
91 fiber_props.set_name(
"AcceptLoop");
95 fibers::condition_variable_any clist_empty_cnd;
98 void wait(std::unique_lock<fibers::mutex>& lk) {
99 clist_empty_cnd.wait(lk, [&] {
return clist.empty(); });
103 std::shared_ptr<SharedCList> clist_ptr = std::make_shared<SharedCList>();
105 system::error_code ec;
111 auto clean_cb = [&, &accpt_cntxt = wrapper->io_context](ConnectionHandler::ptr_t p) {
112 accpt_cntxt.AsyncFiber([&, clist_ptr, p = std::move(p)]()
mutable {
113 std::lock_guard<fibers::mutex> lk(clist_ptr->mu);
118 if (clist_ptr->clist.empty()) {
119 clist_ptr->clist_empty_cnd.notify_one();
126 std::tie(handler, ec) = AcceptConnection(wrapper);
127 auto delay_msec = base::GetMonotonicMicrosFast() - fiber_props.awaken_ts();
128 LOG_IF(INFO, delay_msec > 500) <<
"Had " << delay_msec <<
" accepting connection";
132 if (ec == asio::error::try_again)
134 LOG_IF(INFO, ec != std::errc::operation_canceled) <<
"Stopped with error " << ec.message();
137 CHECK_NOTNULL(handler);
138 clist_ptr->clist.push_front(*handler);
140 DCHECK(!clist_ptr->clist.empty());
141 DCHECK(handler->hook_.is_linked());
145 handler->context().AsyncFiber(
146 [&](ConnectionHandler::ptr_t conn_ptr) {
147 conn_ptr->RunInIOThread();
148 clean_cb(std::move(conn_ptr));
152 }
catch (std::exception
const& ex) {
153 LOG(WARNING) <<
": caught exception : " << ex.what();
156 wrapper->listener->PreShutdown();
158 if (!clist_ptr->clist.empty()) {
159 VLOG(1) <<
"Starting closing connections";
162 std::unique_lock<fibers::mutex> lk(clist_ptr->mu);
163 auto it = clist_ptr->clist.begin();
166 while (it != clist_ptr->clist.end()) {
169 ConnectionHandler::ptr_t guard(&*it);
179 VLOG(1) <<
"Closed " << cnt <<
" connections";
186 wrapper->listener->PostShutdown();
188 LOG(INFO) <<
"Accept server stopped for port " << wrapper->port;
195 auto AcceptServer::AcceptConnection(ListenerWrapper* wrapper) -> AcceptResult {
196 IoContext& io_cntx = pool_->GetNextContext();
198 system::error_code ec;
199 tcp::socket sock(io_cntx.raw_context());
201 wrapper->acceptor.async_accept(sock, fibers_ext::yield[ec]);
202 if (!ec && !sock.is_open())
203 ec = asio::error::try_again;
205 return AcceptResult(
nullptr, ec);
206 DCHECK(sock.is_open()) << sock.native_handle();
207 VLOG(1) <<
"Accepted socket " << sock.remote_endpoint() <<
"/" << sock.native_handle();
209 ConnectionHandler* conn = wrapper->listener->NewConnection(io_cntx);
210 conn->Init(std::move(sock));
212 return AcceptResult(conn, ec);
215 void AcceptServer::Run() {
216 if (!listeners_.empty()) {
217 ref_bc_.Add(listeners_.size());
219 for (
auto& listener : listeners_) {
220 ListenerWrapper* ptr = &listener;
221 io_context& io_cntx = listener.io_context.raw_context();
222 asio::post(io_cntx, [
this, ptr] {
223 fibers::fiber srv_fb(&AcceptServer::AcceptInIOThread,
this, ptr);
231 void AcceptServer::Wait() {
234 VLOG(1) <<
"AcceptServer::Wait completed";
236 CHECK(listeners_.empty()) <<
"Must Call AcceptServer::Run() after adding listeners";