accept_server.cc
1 // Copyright 2018, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 #include "util/asio/accept_server.h"
5 
6 #include <boost/fiber/mutex.hpp>
7 
8 #include "base/logging.h"
9 #include "base/walltime.h"
10 #include "util/asio/io_context_pool.h"
11 #include "util/asio/yield.h"
12 #include "util/fibers/fibers_ext.h"
13 
14 namespace util {
15 
16 using namespace boost;
17 using asio::ip::tcp;
18 
19 // auto_unlink requires cache_last,constant_time_size = false.
20 using ListType = detail::slist<ConnectionHandler, ConnectionHandler::member_hook_t,
21  detail::constant_time_size<false>, detail::cache_last<false>>;
22 
23 
24 AcceptServer::ListenerWrapper::ListenerWrapper(const endpoint& ep, IoContext* io_context,
25  ListenerInterface* si)
26  : io_context(*io_context), acceptor(io_context->raw_context(), ep.protocol()), listener(si) {
27  acceptor.set_option(asio::socket_base::reuse_address(true));
28 
29  system::error_code ec;
30  acceptor.bind(ep, ec);
31  CHECK(!ec) << ec << "/" << ec.message() << " for port " << ep;
32 
33  acceptor.listen();
34  port = acceptor.local_endpoint().port();
35 }
36 
37 AcceptServer::AcceptServer(IoContextPool* pool)
38  : pool_(pool), signals_(pool->GetNextContext().raw_context(), SIGINT, SIGTERM), ref_bc_(1) {
39 
40  // This cb function should not block.
41  auto non_blocking_cb = [this](system::error_code ec, int /*signo*/) {
42  // The server is stopped by cancelling all outstanding asynchronous
43  // operations. Once all operations have finished the io_context::run()
44  // call will exit.
45  VLOG(1) << "Signal with ec " << ec << " " << ec.message();
46  for (auto& l : listeners_) {
47  if (l.acceptor.is_open()) {
48  asio::post(l.acceptor.get_executor(), [acc = &l.acceptor]() mutable { acc->close(); });
49  }
50  }
51 
52  // non_blocking_cb is also triggerred during the normal shutdown flow.
53  // In that case we should not call on_break_hook_.
54  if (!ec && on_break_hook_) {
55  fibers::fiber{on_break_hook_}.detach();
56  }
57 
58  ref_bc_.Dec();
59  };
60 
61  signals_.async_wait(non_blocking_cb);
62 }
63 
64 AcceptServer::~AcceptServer() {
65  Stop();
66  Wait();
67 }
68 
69 unsigned short AcceptServer::AddListener(unsigned short port, ListenerInterface* si) {
70  CHECK(si);
71 
72  // We can not allow dynamic listener additions because listeners_ might reallocate.
73  CHECK(!was_run_);
74 
75  si->RegisterPool(pool_);
76 
77  tcp::endpoint endpoint(tcp::v4(), port);
78  IoContext& io_context = pool_->GetNextContext();
79  listeners_.emplace_back(endpoint, &io_context, si);
80  auto& listener = listeners_.back();
81 
82  LOG(INFO) << "AcceptServer - listening on port " << listener.port;
83 
84  return listener.port;
85 }
86 
87 void AcceptServer::AcceptInIOThread(ListenerWrapper* wrapper) {
88  CHECK(wrapper->io_context.InContextThread());
89  auto& fiber_props = this_fiber::properties<IoFiberProperties>();
90  fiber_props.SetNiceLevel(IoFiberProperties::MAX_NICE_LEVEL - 1);
91  fiber_props.set_name("AcceptLoop");
92 
93  struct SharedCList {
94  ListType clist;
95  fibers::condition_variable_any clist_empty_cnd;
96  fibers::mutex mu;
97 
98  void wait(std::unique_lock<fibers::mutex>& lk) {
99  clist_empty_cnd.wait(lk, [&] { return clist.empty(); });
100  }
101  };
102 
103  std::shared_ptr<SharedCList> clist_ptr = std::make_shared<SharedCList>();
104 
105  system::error_code ec;
106  util::ConnectionHandler* handler = nullptr;
107 
108  // We release intrusive pointer in our thread by delegating the code to accpt_cntxt.
109  // Please note that since we update clist in the same thread, we do not need mutex
110  // to protect the state.
111  auto clean_cb = [&, &accpt_cntxt = wrapper->io_context](ConnectionHandler::ptr_t p) {
112  accpt_cntxt.AsyncFiber([&, clist_ptr, p = std::move(p)]() mutable {
113  std::lock_guard<fibers::mutex> lk(clist_ptr->mu);
114 
115  // This runs in our AcceptServer::RunInIOThread thread.
116  p.reset(); // Possible interrupt point, we do not know what ~ConnectionHandler() does.
117 
118  if (clist_ptr->clist.empty()) {
119  clist_ptr->clist_empty_cnd.notify_one();
120  }
121  });
122  };
123 
124  try {
125  for (;;) {
126  std::tie(handler, ec) = AcceptConnection(wrapper);
127  auto delay_msec = base::GetMonotonicMicrosFast() - fiber_props.awaken_ts();
128  LOG_IF(INFO, delay_msec > 500) << "Had " << delay_msec << " accepting connection";
129 
130  if (ec) {
131  CHECK(!handler);
132  if (ec == asio::error::try_again)
133  continue;
134  LOG_IF(INFO, ec != std::errc::operation_canceled) << "Stopped with error " << ec.message();
135  break; // TODO: To refine it.
136  } else {
137  CHECK_NOTNULL(handler);
138  clist_ptr->clist.push_front(*handler);
139 
140  DCHECK(!clist_ptr->clist.empty());
141  DCHECK(handler->hook_.is_linked());
142 
143  // handler->context() does not necessary equals to wrapper->io_context
144  // and we possibly launching the connection in a different thread.
145  handler->context().AsyncFiber(
146  [&](ConnectionHandler::ptr_t conn_ptr) {
147  conn_ptr->RunInIOThread();
148  clean_cb(std::move(conn_ptr)); // signal our thread that we want to dispose it.
149  }, handler);
150  }
151  }
152  } catch (std::exception const& ex) {
153  LOG(WARNING) << ": caught exception : " << ex.what();
154  }
155 
156  wrapper->listener->PreShutdown();
157 
158  if (!clist_ptr->clist.empty()) {
159  VLOG(1) << "Starting closing connections";
160  unsigned cnt = 0;
161 
162  std::unique_lock<fibers::mutex> lk(clist_ptr->mu);
163  auto it = clist_ptr->clist.begin();
164 
165  // We do not remove connections from clist_ptr->clist, we just signal them to stop.
166  while (it != clist_ptr->clist.end()) {
167  // guarding the current item, preserving it for getting the next item.
168  // The reason for this is it->Close() is interruptable.
169  ConnectionHandler::ptr_t guard(&*it);
170 
171  // it->Close() can preempt and meanwhile *it connection can finish and be deleted in
172  // clean_cb. That will invalidate "it".
173  it->Close();
174 
175  ++it;
176  ++cnt;
177  }
178 
179  VLOG(1) << "Closed " << cnt << " connections";
180 
181  // lk is really redundant but is required by cv-interface:
182  // We update clist only in this thread so the protection is not needed.
183  clist_ptr->wait(lk);
184  }
185 
186  wrapper->listener->PostShutdown();
187 
188  LOG(INFO) << "Accept server stopped for port " << wrapper->port;
189 
190  // Notify that AcceptThread is about to exit.
191  ref_bc_.Dec();
192  // Here accessing wrapper might be unsafe.
193 }
194 
195 auto AcceptServer::AcceptConnection(ListenerWrapper* wrapper) -> AcceptResult {
196  IoContext& io_cntx = pool_->GetNextContext();
197 
198  system::error_code ec;
199  tcp::socket sock(io_cntx.raw_context());
200 
201  wrapper->acceptor.async_accept(sock, fibers_ext::yield[ec]);
202  if (!ec && !sock.is_open())
203  ec = asio::error::try_again;
204  if (ec)
205  return AcceptResult(nullptr, ec);
206  DCHECK(sock.is_open()) << sock.native_handle();
207  VLOG(1) << "Accepted socket " << sock.remote_endpoint() << "/" << sock.native_handle();
208 
209  ConnectionHandler* conn = wrapper->listener->NewConnection(io_cntx);
210  conn->Init(std::move(sock));
211 
212  return AcceptResult(conn, ec);
213 }
214 
215 void AcceptServer::Run() {
216  if (!listeners_.empty()) {
217  ref_bc_.Add(listeners_.size());
218 
219  for (auto& listener : listeners_) {
220  ListenerWrapper* ptr = &listener;
221  io_context& io_cntx = listener.io_context.raw_context();
222  asio::post(io_cntx, [this, ptr] {
223  fibers::fiber srv_fb(&AcceptServer::AcceptInIOThread, this, ptr);
224  srv_fb.detach();
225  });
226  }
227  }
228  was_run_ = true;
229 }
230 
231 void AcceptServer::Wait() {
232  if (was_run_) {
233  ref_bc_.Wait();
234  VLOG(1) << "AcceptServer::Wait completed";
235  } else {
236  CHECK(listeners_.empty()) << "Must Call AcceptServer::Run() after adding listeners";
237  }
238 }
239 
240 } // namespace util