rpc_conn_handler.cc
1 // Copyright 2018, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 #include "util/rpc/impl/rpc_conn_handler.h"
5 
6 #include <boost/asio/read.hpp>
7 #include <boost/asio/write.hpp>
8 
9 #include "base/flags.h"
10 #include "base/logging.h"
11 
12 #include "util/asio/asio_utils.h"
13 #include "util/asio/io_context.h"
14 
15 namespace util {
16 namespace rpc {
17 
18 using namespace std::chrono_literals;
19 
20 namespace {} // namespace
21 
22 using asio::ip::tcp;
23 using fibers_ext::yield;
24 
25 constexpr size_t kRpcPoolSize = 32;
26 
27 RpcConnectionHandler::RpcConnectionHandler(ConnectionBridge* bridge, IoContext* context)
28  : ConnectionHandler(context), bridge_(bridge), rpc_items_(kRpcPoolSize) {
29  use_flusher_fiber_ = true;
30 }
31 
32 RpcConnectionHandler::~RpcConnectionHandler() {
33  bridge_->Join();
34 
35  outgoing_buf_.clear_and_dispose([this](RpcItem* i) { rpc_items_.Release(i); });
36 }
37 
38 bool RpcConnectionHandler::FlushWrites() {
39  bool res = false;
40  if (socket_->is_open() && !outgoing_buf_.empty()) {
41  res = FlushWritesInternal();
42  req_flushes_ += uint64_t(res);
43  }
44  return res;
45 }
46 
47 void RpcConnectionHandler::OnOpenSocket() {
48  VLOG(1) << "OnOpenSocket: " << socket_->native_handle();
49 
50  bridge_->InitInThread();
51 }
52 
53 void RpcConnectionHandler::OnCloseSocket() {
54  VLOG(1) << "OnCloseSocket: Before flush fiber join " << socket_->native_handle();
55  DCHECK(io_context_.InContextThread());
56 }
57 
58 system::error_code RpcConnectionHandler::HandleRequest() {
59  VLOG(2) << "HandleRequest " << socket_->is_open() << " / "
60  << (socket_->is_open() ? socket_->remote_endpoint(ec_) : tcp::endpoint());
61 
62  if (ec_)
63  return ec_;
64 
65  rpc::Frame frame;
66  ec_ = frame.Read(&socket_.value());
67  if (ec_) {
68  return ec_;
69  }
70 
71  DCHECK_NE(-1, socket_->native_handle());
72 
73  if (rpc_items_.empty() && !outgoing_buf_.empty()) {
74  req_flushes_ += FlushWritesInternal();
75  }
76 
77  // We use item for reading the envelope.
78  auto item_ptr = rpc_items_.make_unique();
79 
80  Envelope* envelope = &item_ptr->envelope;
81  envelope->Resize(frame.header_size, frame.letter_size);
82  auto rbuf_seq = item_ptr->buf_seq();
83  asio::read(*socket_, rbuf_seq, ec_);
84  if (ec_) {
85  VLOG(1) << "async_read " << ec_ << " /" << socket_->native_handle();
86  return ec_;
87  }
88  DCHECK_NE(-1, socket_->native_handle());
89 
90  // To support streaming we have this writer that creq_flushes_an write multiple envelopes per
91  // single rpc request. We pass captures by value to allow asynchronous invocation
92  // of ConnectionBridge::HandleEnvelope. We move writer object into HandleEnvelope,
93  // thus it will be responsible to own it until the handler finishes.
94  // Please note that writer changes the value of 'item' field (it's mutable),
95  // so only for the first outgoing envelope it uses the same RpcItem used for reading the data
96  // to reduce allocations.
97  auto writer = [rpc_id = frame.rpc_id, item = item_ptr.release(), this](Envelope&& env) mutable {
98  RpcItem* next = item ? item : rpc_items_.Get();
99 
100  next->envelope = std::move(env);
101  next->id = rpc_id;
102  outgoing_buf_.push_back(*next);
103  item = nullptr;
104  };
105 
106  // Might by asynchronous, depends on the bridge_.
107  bridge_->HandleEnvelope(frame.rpc_id, envelope, std::move(writer));
108 
109  return ec_;
110 }
111 
112 bool RpcConnectionHandler::FlushWritesInternal() {
113  // Serves as critical section. We can not allow interleaving writes into the socket.
114  // If another fiber flushes - we just exit without blocking.
115  std::unique_lock<fibers::mutex> ul(wr_mu_, std::try_to_lock_t{});
116  if (!ul || outgoing_buf_.empty() || !socket_->is_open())
117  return false;
118 
119  VLOG(2) << "FlushWritesGuarded: " << outgoing_buf_.size();
120  size_t count = outgoing_buf_.size();
121  write_seq_.resize(count * 3);
122  frame_buf_.resize(count);
123 
124  ItemList tmp;
125  size_t item_index = 0;
126  for (RpcItem& item : outgoing_buf_) { // iterate over intrusive list.
127  Frame f(item.id, item.envelope.header.size(), item.envelope.letter.size());
128 
129  uint8_t* buf = frame_buf_[item_index].data();
130  size_t frame_sz = f.Write(buf);
131  write_seq_[3 * item_index] = asio::buffer(buf, frame_sz);
132  write_seq_[3 * item_index + 1] = asio::buffer(item.envelope.header);
133  write_seq_[3 * item_index + 2] = asio::buffer(item.envelope.letter);
134  ++item_index;
135  }
136  tmp.swap(outgoing_buf_);
137 
138  size_t write_sz = asio::write(*socket_, write_seq_, ec_);
139 
140  // We should use clear_and_dispose to delete items safely while unlinking them from tmp.
141  tmp.clear_and_dispose([this](RpcItem* i) { rpc_items_.Release(i); });
142 
143  VLOG(2) << "Wrote " << count << " requests with " << write_sz << " bytes";
144  return true;
145 }
146 
147 } // namespace rpc
148 } // namespace util