4 #include "util/rpc/impl/rpc_conn_handler.h" 6 #include <boost/asio/read.hpp> 7 #include <boost/asio/write.hpp> 9 #include "base/flags.h" 10 #include "base/logging.h" 12 #include "util/asio/asio_utils.h" 13 #include "util/asio/io_context.h" 18 using namespace std::chrono_literals;
23 using fibers_ext::yield;
25 constexpr
size_t kRpcPoolSize = 32;
27 RpcConnectionHandler::RpcConnectionHandler(ConnectionBridge* bridge, IoContext* context)
28 : ConnectionHandler(context), bridge_(bridge), rpc_items_(kRpcPoolSize) {
29 use_flusher_fiber_ =
true;
32 RpcConnectionHandler::~RpcConnectionHandler() {
35 outgoing_buf_.clear_and_dispose([
this](RpcItem* i) { rpc_items_.Release(i); });
38 bool RpcConnectionHandler::FlushWrites() {
40 if (socket_->is_open() && !outgoing_buf_.empty()) {
41 res = FlushWritesInternal();
42 req_flushes_ += uint64_t(res);
47 void RpcConnectionHandler::OnOpenSocket() {
48 VLOG(1) <<
"OnOpenSocket: " << socket_->native_handle();
50 bridge_->InitInThread();
53 void RpcConnectionHandler::OnCloseSocket() {
54 VLOG(1) <<
"OnCloseSocket: Before flush fiber join " << socket_->native_handle();
55 DCHECK(io_context_.InContextThread());
58 system::error_code RpcConnectionHandler::HandleRequest() {
59 VLOG(2) <<
"HandleRequest " << socket_->is_open() <<
" / " 60 << (socket_->is_open() ? socket_->remote_endpoint(ec_) : tcp::endpoint());
66 ec_ = frame.Read(&socket_.value());
71 DCHECK_NE(-1, socket_->native_handle());
73 if (rpc_items_.empty() && !outgoing_buf_.empty()) {
74 req_flushes_ += FlushWritesInternal();
78 auto item_ptr = rpc_items_.make_unique();
80 Envelope* envelope = &item_ptr->envelope;
81 envelope->Resize(frame.header_size, frame.letter_size);
82 auto rbuf_seq = item_ptr->buf_seq();
83 asio::read(*socket_, rbuf_seq, ec_);
85 VLOG(1) <<
"async_read " << ec_ <<
" /" << socket_->native_handle();
88 DCHECK_NE(-1, socket_->native_handle());
97 auto writer = [rpc_id = frame.rpc_id, item = item_ptr.release(),
this](Envelope&& env)
mutable {
98 RpcItem* next = item ? item : rpc_items_.Get();
100 next->envelope = std::move(env);
102 outgoing_buf_.push_back(*next);
107 bridge_->HandleEnvelope(frame.rpc_id, envelope, std::move(writer));
112 bool RpcConnectionHandler::FlushWritesInternal() {
115 std::unique_lock<fibers::mutex> ul(wr_mu_, std::try_to_lock_t{});
116 if (!ul || outgoing_buf_.empty() || !socket_->is_open())
119 VLOG(2) <<
"FlushWritesGuarded: " << outgoing_buf_.size();
120 size_t count = outgoing_buf_.size();
121 write_seq_.resize(count * 3);
122 frame_buf_.resize(count);
125 size_t item_index = 0;
126 for (RpcItem& item : outgoing_buf_) {
127 Frame f(item.id, item.envelope.header.size(), item.envelope.letter.size());
129 uint8_t* buf = frame_buf_[item_index].data();
130 size_t frame_sz = f.Write(buf);
131 write_seq_[3 * item_index] = asio::buffer(buf, frame_sz);
132 write_seq_[3 * item_index + 1] = asio::buffer(item.envelope.header);
133 write_seq_[3 * item_index + 2] = asio::buffer(item.envelope.letter);
136 tmp.swap(outgoing_buf_);
138 size_t write_sz = asio::write(*socket_, write_seq_, ec_);
141 tmp.clear_and_dispose([
this](RpcItem* i) { rpc_items_.Release(i); });
143 VLOG(2) <<
"Wrote " << count <<
" requests with " << write_sz <<
" bytes";