5 #include "util/asio/connection_handler.h" 7 #include <boost/asio/dispatch.hpp> 8 #include <boost/asio/write.hpp> 10 #include "base/logging.h" 11 #include "util/asio/io_context.h" 12 #include "util/fibers/event_count.h" 13 #include "util/stats/varz_stats.h" 15 using namespace boost;
16 using namespace boost::asio;
20 DEFINE_VARZ(VarzCount, connections);
26 using FlushList = detail::slist<ConnectionHandler, ConnectionHandler::flush_hook_t,
27 detail::constant_time_size<false>, detail::cache_last<false>>;
38 flush_conn_list_.push_front(*me);
48 std::unique_lock<fibers::mutex> lock(mu_);
49 flush_conn_list_.erase(FlushList::s_iterator_to(*me));
66 fibers::condition_variable cv_;
72 FlushList flush_conn_list_;
76 VLOG(1) <<
"Flusher start ";
87 void Flusher::SpinFlush() {
88 std::unique_lock<fibers::mutex> lock(mu_);
90 uint32_t no_flush = 0;
94 while (no_flush < 100) {
95 cv_.wait_for(lock, 300us);
100 for (
auto it = flush_conn_list_.begin(); it != flush_conn_list_.end(); ++it) {
101 if (it->FlushWrites()) {
108 void Flusher::Sleep() {
110 sleep_ec_.await([
this] {
return !sleep_;});
113 void Flusher::Cancel() {
114 VLOG(1) <<
"Flusher::Cancel";
118 std::lock_guard<fibers::mutex> lock(mu_);
126 inline bool IsExpectedFinish(system::error_code ec) {
127 return ec == error::eof || ec == error::operation_aborted || ec == error::connection_reset ||
128 ec == error::not_connected;
131 static thread_local detail::Flusher* local_flusher =
nullptr;
135 ConnectionHandler::ConnectionHandler(IoContext* context) noexcept : io_context_(*context) {
136 CHECK_NOTNULL(context);
139 ConnectionHandler::~ConnectionHandler() {
142 void ConnectionHandler::Init(asio::ip::tcp::socket&& sock) {
143 CHECK(!socket_ && sock.is_open());
144 ip::tcp::no_delay nd(
true);
145 system::error_code ec;
146 sock.set_option(nd, ec);
148 LOG(ERROR) <<
"Could not set socket option " << ec.message() <<
" " << ec;
150 sock.non_blocking(
true, ec);
152 LOG(ERROR) <<
"Could not make socket nonblocking " << ec.message() <<
" " << ec;
154 socket_.emplace(std::move(sock));
155 CHECK(socket_->is_open());
161 void ConnectionHandler::RunInIOThread() {
162 DCHECK(io_context_.InContextThread());
169 if (use_flusher_fiber_) {
170 if (!local_flusher) {
171 local_flusher =
new detail::Flusher;
172 io_context_.AttachCancellable(local_flusher);
174 local_flusher->Bind(
this);
177 VLOG(1) <<
"ConnectionHandler::RunInIOThread: " << socket_->native_handle();
178 system::error_code ec;
181 while (socket_->is_open()) {
182 ec = HandleRequest();
184 if (!IsExpectedFinish(ec)) {
185 LOG(WARNING) <<
"[" << socket_->native_handle() <<
"] Error : " << ec.message() <<
", " 186 << ec.category().name() <<
"/" << ec.value();
190 if (use_flusher_fiber_) {
191 local_flusher->WakeIfNeeded();
194 VLOG(1) <<
"ConnectionHandler closed: " << socket_->native_handle();
195 }
catch (std::exception
const& ex) {
196 string str = ex.what();
200 if (use_flusher_fiber_) {
201 local_flusher->Remove(
this);
206 connections.IncBy(-1);
212 void ConnectionHandler::Close() {
214 io_context_.AwaitSafe([
this] {
215 if (!socket_->is_open())
218 system::error_code ec;
219 VLOG(1) <<
"Before shutdown " << socket_->native_handle();
220 socket_->Shutdown(ec);
221 VLOG(1) <<
"After shutdown: " << ec <<
" " << ec.message();
232 void ListenerInterface::RegisterPool(IoContextPool* pool) {
235 CHECK(pool_ ==
nullptr || pool_ == pool);