connection_handler.cc
1 // Copyright 2018, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 
5 #include "util/asio/connection_handler.h"
6 
7 #include <boost/asio/dispatch.hpp>
8 #include <boost/asio/write.hpp>
9 
10 #include "base/logging.h"
11 #include "util/asio/io_context.h"
12 #include "util/fibers/event_count.h"
13 #include "util/stats/varz_stats.h"
14 
15 using namespace boost;
16 using namespace boost::asio;
17 
18 using namespace std;
19 
20 DEFINE_VARZ(VarzCount, connections);
21 
22 namespace util {
23 
24 namespace detail {
25 
26 using FlushList = detail::slist<ConnectionHandler, ConnectionHandler::flush_hook_t,
27  detail::constant_time_size<false>, detail::cache_last<false>>;
28 
30  public:
31  Flusher() {
32  }
33 
34  void Run() final;
35  void Cancel() final;
36 
37  void Bind(ConnectionHandler* me) {
38  flush_conn_list_.push_front(*me);
39  }
40 
41  void Remove(ConnectionHandler* me) {
42  // To make sure me does not run. Since Flusher and onnectionHandler fibers are
43  // on the same thread, then either:
44  // 1. Flusher is not running FlushWrites()- then we can just remove ourselves.
45  // and FlushList::iterator will be still valid where it points to.
46  // 2. We are inside me->FlushWrites(). In that case mu_ is locked we want to wait till
47  // FlushWrites finishes before we remove outselves.
48  std::unique_lock<fibers::mutex> lock(mu_);
49  flush_conn_list_.erase(FlushList::s_iterator_to(*me));
50  }
51 
52  void WakeIfNeeded() {
53  if (sleep_) {
54  sleep_ = false;
55  sleep_ec_.notify();
56  }
57  }
58 
59  private:
60  void SpinFlush();
61  void Sleep();
62 
63  bool stop_ = false;
64  bool sleep_ = false; // we are thread-local so no need to atomic, thread-safety.
65 
66  fibers::condition_variable cv_;
67  fibers::mutex mu_;
68 
70  // just expose those methods directly.
71  fibers_ext::EventCount sleep_ec_;
72  FlushList flush_conn_list_;
73 };
74 
75 void Flusher::Run() {
76  VLOG(1) << "Flusher start ";
77 
78  while (!stop_) {
79  SpinFlush();
80 
81  if (!stop_) {
82  Sleep();
83  }
84  }
85 }
86 
87 void Flusher::SpinFlush() {
88  std::unique_lock<fibers::mutex> lock(mu_);
89 
90  uint32_t no_flush = 0;
91 
92  // We count iterations with no flushes at all.
93  // If server does not need flushing for some time we return.
94  while (no_flush < 100) {
95  cv_.wait_for(lock, 300us);
96 
97  if (stop_)
98  break;
99  ++no_flush;
100  for (auto it = flush_conn_list_.begin(); it != flush_conn_list_.end(); ++it) {
101  if (it->FlushWrites()) {
102  no_flush = 0;
103  }
104  }
105  }
106 }
107 
108 void Flusher::Sleep() {
109  sleep_ = true;
110  sleep_ec_.await([this] { return !sleep_;});
111 }
112 
113 void Flusher::Cancel() {
114  VLOG(1) << "Flusher::Cancel";
115  stop_ = true;
116 
117  WakeIfNeeded();
118  std::lock_guard<fibers::mutex> lock(mu_);
119  cv_.notify_all();
120 }
121 
122 } // namespace detail
123 
124 namespace {
125 
126 inline bool IsExpectedFinish(system::error_code ec) {
127  return ec == error::eof || ec == error::operation_aborted || ec == error::connection_reset ||
128  ec == error::not_connected;
129 }
130 
131 static thread_local detail::Flusher* local_flusher = nullptr;
132 
133 } // namespace
134 
135 ConnectionHandler::ConnectionHandler(IoContext* context) noexcept : io_context_(*context) {
136  CHECK_NOTNULL(context);
137 }
138 
139 ConnectionHandler::~ConnectionHandler() {
140 }
141 
142 void ConnectionHandler::Init(asio::ip::tcp::socket&& sock) {
143  CHECK(!socket_ && sock.is_open());
144  ip::tcp::no_delay nd(true);
145  system::error_code ec;
146  sock.set_option(nd, ec);
147  if (ec)
148  LOG(ERROR) << "Could not set socket option " << ec.message() << " " << ec;
149 
150  sock.non_blocking(true, ec);
151  if (ec)
152  LOG(ERROR) << "Could not make socket nonblocking " << ec.message() << " " << ec;
153 
154  socket_.emplace(std::move(sock));
155  CHECK(socket_->is_open());
156 }
157 
158 /*****************************************************************************
159  * fiber function per server connection
160  *****************************************************************************/
161 void ConnectionHandler::RunInIOThread() {
162  DCHECK(io_context_.InContextThread());
163 
164  connections.Inc();
165 
166  CHECK(socket_);
167  OnOpenSocket();
168 
169  if (use_flusher_fiber_) {
170  if (!local_flusher) {
171  local_flusher = new detail::Flusher;
172  io_context_.AttachCancellable(local_flusher);
173  }
174  local_flusher->Bind(this);
175  }
176 
177  VLOG(1) << "ConnectionHandler::RunInIOThread: " << socket_->native_handle();
178  system::error_code ec;
179 
180  try {
181  while (socket_->is_open()) {
182  ec = HandleRequest();
183  if (UNLIKELY(ec)) {
184  if (!IsExpectedFinish(ec)) {
185  LOG(WARNING) << "[" << socket_->native_handle() << "] Error : " << ec.message() << ", "
186  << ec.category().name() << "/" << ec.value();
187  }
188  break;
189  }
190  if (use_flusher_fiber_) {
191  local_flusher->WakeIfNeeded();
192  }
193  }
194  VLOG(1) << "ConnectionHandler closed: " << socket_->native_handle();
195  } catch (std::exception const& ex) {
196  string str = ex.what();
197  LOG(ERROR) << str;
198  }
199 
200  if (use_flusher_fiber_) {
201  local_flusher->Remove(this);
202  }
203 
204  Close();
205 
206  connections.IncBy(-1);
207 
208  // RunInIOThread runs as lambda packaged with ptr_t guard on this. Once the lambda finishes,
209  // it releases the ownership over this.
210 }
211 
212 void ConnectionHandler::Close() {
213  // Run Listener hook in the connection thread.
214  io_context_.AwaitSafe([this] {
215  if (!socket_->is_open())
216  return;
217 
218  system::error_code ec;
219  VLOG(1) << "Before shutdown " << socket_->native_handle();
220  socket_->Shutdown(ec);
221  VLOG(1) << "After shutdown: " << ec << " " << ec.message();
222 
223  // socket::close() closes the underlying socket and cancels the pending operations.
224  // HOWEVER the problem is that those operations return with ec = ok()
225  // so the flow is not aware that the socket is closed.
226  // That can lead to nasty bugs. Therefore the only place we close
227  // socket is from the listener loop. Here we only signal that we are ready to close.
228  OnCloseSocket();
229  });
230 }
231 
232 void ListenerInterface::RegisterPool(IoContextPool* pool) {
233  // In tests we might relaunch AcceptServer with the same listener, so we allow
234  // reassigning the same pool.
235  CHECK(pool_ == nullptr || pool_ == pool);
236  pool_ = pool;
237 }
238 
239 } // namespace util