channel.cc
1 // Copyright 2018, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 #include <boost/asio/write.hpp>
5 #include <boost/fiber/future.hpp>
6 #include <chrono>
7 
8 #include "util/rpc/channel.h"
9 
10 #include "base/logging.h"
11 #include "util/asio/asio_utils.h"
12 #include "util/rpc/frame_format.h"
13 #include "util/rpc/rpc_envelope.h"
14 
15 namespace util {
16 namespace rpc {
17 
18 DEFINE_uint32(rpc_client_pending_limit, 1 << 17,
19  "How many outgoing requests we are ready to accommodate before rejecting "
20  "a new RPC request");
21 
22 DEFINE_uint32(rpc_client_queue_size, 128,
23  "The size of the outgoing batch queue that contains envelopes waiting to send.");
24 
25 using namespace boost;
26 using namespace std;
27 using asio::ip::tcp;
28 using folly::RWSpinLock;
29 namespace error = asio::error;
30 
31 namespace {
32 
33 bool IsExpectedFinish(system::error_code ec) {
34  return ec == error::eof || ec == error::operation_aborted || ec == error::connection_reset ||
35  ec == error::not_connected;
36 }
37 
38 constexpr uint32_t kTickPrecision = 3; // 3ms per timer tick.
39 
40 } // namespace
41 
42 Channel::~Channel() {
43  Shutdown();
44 
45  CHECK(read_fiber_.joinable());
46  read_fiber_.join();
47  flush_fiber_.join();
48  VLOG(1) << "After ReadFiberJoin";
49 }
50 
51 void Channel::Shutdown() {
52  error_code ec;
53  socket_->Shutdown(ec);
54 }
55 
56 auto Channel::Connect(uint32_t ms) -> error_code {
57  CHECK(!read_fiber_.joinable());
58  error_code ec = socket_->ClientWaitToConnect(ms);
59 
60  IoContext& context = socket_->context();
61  context.Await([this] {
62  read_fiber_ = fibers::fiber(&Channel::ReadFiber, this);
63  flush_fiber_ = fibers::fiber(&Channel::FlushFiber, this);
64  });
65  expiry_task_.reset(new PeriodicTask(context, chrono::milliseconds(kTickPrecision)));
66 
67  expiry_task_->Start([this](int ticks) {
68  DCHECK_GT(ticks, 0);
69  this->expire_timer_.advance(ticks);
70  DVLOG(3) << "Advancing expiry to " << this->expire_timer_.now();
71  });
72 
73  return ec;
74 }
75 
76 auto Channel::PresendChecks() -> error_code {
77  if (!socket_->is_open()) {
78  return asio::error::shut_down;
79  }
80 
81  if (socket_->status()) {
82  return socket_->status();
83  }
84 
85  if (pending_calls_size_.load(std::memory_order_relaxed) >= FLAGS_rpc_client_pending_limit) {
86  return asio::error::no_buffer_space;
87  }
88 
89  error_code ec;
90 
91  if (outgoing_buf_size_.load(std::memory_order_relaxed) >= FLAGS_rpc_client_queue_size) {
92  ec = FlushSends();
93  }
94  return ec;
95 }
96 
97 auto Channel::Send(uint32 deadline_msec, Envelope* envelope) -> future_code_t {
98  DCHECK(read_fiber_.joinable()) << "Call Channel::Connect(), stupid.";
99  DCHECK_GT(deadline_msec, 0);
100 
101  // ----
102  fibers::promise<error_code> p;
103  fibers::future<error_code> res = p.get_future();
104  error_code ec = PresendChecks();
105 
106  if (ec) {
107  p.set_value(ec);
108  return res;
109  }
110 
111  uint32_t ticks = (deadline_msec + kTickPrecision - 1) / kTickPrecision;
112  std::unique_ptr<ExpiryEvent> ev(new ExpiryEvent(this));
113 
114  // We protect against Send thread vs IoContext thread data races.
115  // Fibers inside IoContext thread do not have to protect against each other since
116  // they do not cause data races. So we use lock_shared as "no-op" lock that only becomes
117  // relevant if someone outside IoContext thread locks exclusively.
118  // Also this lock allows multi-threaded access for Send operation.
119  bool lock_exclusive = OutgoingBufLock();
120  RpcId id = next_send_rpc_id_++;
121 
122  ev->set_id(id);
123  base::Tick at = expire_timer_.schedule(ev.get(), ticks);
124  DVLOG(2) << "Scheduled expiry at " << at << " for rpcid " << id;
125 
126  outgoing_buf_.emplace_back(SendItem(id, PendingCall{std::move(p), envelope}));
127  outgoing_buf_.back().second.expiry_event = std::move(ev);
128  outgoing_buf_size_.store(outgoing_buf_.size(), std::memory_order_relaxed);
129 
130  OutgoingBufUnlock(lock_exclusive);
131 
132  return res;
133 }
134 
135 auto Channel::SendAndReadStream(Envelope* msg, MessageCallback cb) -> error_code {
136  DCHECK(read_fiber_.joinable());
137 
138  // ----
139  error_code ec = PresendChecks();
140  if (ec) {
141  return ec;
142  }
143 
144  fibers::promise<error_code> p;
145  fibers::future<error_code> future = p.get_future();
146 
147  // We protect against Send thread vs IoContext thread data races.
148  // Fibers inside IoContext thread do not have to protect against each other.
149  // Therefore use lock_shared as "no-op" lock that only becomes
150  // relevant if someone outside IoContext thread locks exclusively.
151  // Also this lock allows multi-threaded access for Send operation.
152  bool exclusive = OutgoingBufLock();
153 
154  RpcId id = next_send_rpc_id_++;
155 
156  outgoing_buf_.emplace_back(SendItem(id, PendingCall{std::move(p), msg, std::move(cb)}));
157  outgoing_buf_size_.store(outgoing_buf_.size(), std::memory_order_relaxed);
158 
159  OutgoingBufUnlock(exclusive);
160  ec = future.get();
161 
162  return ec;
163 }
164 
165 void Channel::ReadFiber() {
166  CHECK(socket_->context().InContextThread());
167 
168  VLOG(1) << "Start ReadFiber on socket " << socket_->native_handle();
169  this_fiber::properties<IoFiberProperties>().SetNiceLevel(1);
170 
171  while (socket_->is_open()) {
172  error_code ec = ReadEnvelope();
173  if (ec) {
174  LOG_IF(WARNING, !IsExpectedFinish(ec))
175  << "Error reading envelope " << ec << " " << ec.message();
176 
177  CancelPendingCalls(ec);
178  // Required for few reasons:
179  // 1. To preempt the fiber, otherwise it has busy loop in case socket returns the error
180  // preventing other fibers to run.
181  // 2. To throttle cpu.
182  this_fiber::sleep_for(10ms);
183  continue;
184  }
185  }
186  CancelPendingCalls(error_code{});
187  VLOG(1) << "Finish ReadFiber on socket " << socket_->native_handle();
188 }
189 
190 
191 // TODO: To attach Flusher io context thread, similarly to server side.
192 void Channel::FlushFiber() {
193  using namespace std::chrono_literals;
194  CHECK(socket_->context().get_executor().running_in_this_thread());
195  this_fiber::properties<IoFiberProperties>().SetNiceLevel(IoFiberProperties::MAX_NICE_LEVEL - 1);
196 
197  while (true) {
198  this_fiber::sleep_for(300us);
199  if (!socket_->is_open())
200  break;
201 
202  if (outgoing_buf_size_.load(std::memory_order_acquire) == 0 || !send_mu_.try_lock())
203  continue;
204  VLOG(1) << "FlushFiber::FlushSendsGuarded";
205  FlushSendsGuarded();
206  outgoing_buf_size_.store(outgoing_buf_.size(), std::memory_order_release);
207 
208  send_mu_.unlock(); // releases the fence
209  }
210 }
211 
212 auto Channel::FlushSends() -> error_code {
213  // We call FlushSendsGuarded directly from Send fiber because it calls socket.Write
214  // synchronously and we can not Post blocking function into io_context.
215  std::lock_guard<fibers::mutex> guard(send_mu_);
216 
217  error_code ec;
218 
219  // We use `while` because multiple fibers might fill outgoing_buf_
220  // and when the current fiber resumes, the buffer might be full again.
221  while (outgoing_buf_.size() >= FLAGS_rpc_client_queue_size) {
222  ec = FlushSendsGuarded();
223  }
224  outgoing_buf_size_.store(outgoing_buf_.size(), std::memory_order_relaxed);
225 
226  return ec; // Return the last known status code.
227 }
228 
229 auto Channel::FlushSendsGuarded() -> error_code {
230  error_code ec;
231  // This function runs only in IOContext thread. Therefore only
232  if (outgoing_buf_.empty())
233  return ec;
234 
235  ec = socket_->status();
236  if (ec) {
237  CancelSentBufferGuarded(ec);
238  return ec;
239  }
240 
241  // The following section is CPU-only - No IO blocks.
242  {
243  RWSpinLock::ReadHolder holder(buf_lock_); // protect outgoing_buf_ against Send path
244 
245  size_t count = outgoing_buf_.size();
246  write_seq_.resize(count * 3);
247  frame_buf_.resize(count);
248  for (size_t i = 0; i < count; ++i) {
249  auto& p = outgoing_buf_[i];
250  Frame f(p.first, p.second.envelope->header.size(), p.second.envelope->letter.size());
251  size_t sz = f.Write(frame_buf_[i].data());
252 
253  write_seq_[3 * i] = asio::buffer(frame_buf_[i].data(), sz);
254  write_seq_[3 * i + 1] = asio::buffer(p.second.envelope->header);
255  write_seq_[3 * i + 2] = asio::buffer(p.second.envelope->letter);
256  }
257 
258  // Fill the pending call before the socket.Write() because otherwise in case it blocks
259  // *after* it sends, the current fiber might resume after Read fiber receives results
260  // and it would not find them inside pending_calls_.
261  pending_calls_size_.fetch_add(count, std::memory_order_relaxed);
262  for (size_t i = 0; i < count; ++i) {
263  auto& item = outgoing_buf_[i];
264  auto emplace_res = pending_calls_.emplace(item.first, std::move(item.second));
265  CHECK(emplace_res.second);
266  }
267  outgoing_buf_.clear();
268  }
269 
270  // Interrupt point during which outgoing_buf_ could grow.
271  // We do not lock because this function is the only one that writes into channel and it's
272  // guarded by send_mu_.
273  asio::write(*socket_, write_seq_, ec);
274  if (ec) {
275  // I do not know if we need to flush everything but I prefer doing it to make it simpler.
276  CancelPendingCalls(ec);
277  return ec;
278  }
279 
280  return ec;
281 }
282 
283 void Channel::ExpirePending(RpcId id) {
284  DVLOG(1) << "Expire rpc id " << id;
285 
286  auto it = this->pending_calls_.find(id);
287  if (it == this->pending_calls_.end()) {
288  // TODO: there could be that the call is in outgoing_buf_ and we did not expiry it.
289  return;
290  }
291  // The order is important to eliminate interrupts.
292  EcPromise pr = std::move(it->second.promise);
293  this->pending_calls_.erase(it);
294  pr.set_value(asio::error::timed_out);
295 }
296 
297 void Channel::CancelSentBufferGuarded(error_code ec) {
298  std::vector<SendItem> tmp;
299 
300  buf_lock_.lock_shared();
301  tmp.swap(outgoing_buf_);
302  buf_lock_.unlock_shared();
303 
304  for (auto& item : tmp) {
305  auto promise = std::move(item.second.promise);
306  promise.set_value(ec);
307  }
308 }
309 
310 auto Channel::ReadEnvelope() -> error_code {
311  Frame f;
312  error_code ec = f.Read(socket_.get());
313  if (ec)
314  return ec;
315 
316  VLOG(2) << "Got rpc_id " << f.rpc_id << " from socket " << socket_->native_handle();
317 
318  auto it = pending_calls_.find(f.rpc_id);
319  if (it == pending_calls_.end()) {
320  // It might happens if for some reason we flushed pending_calls_ or the rpc has expired and
321  // the envelope reached us afterwards. We just consume it.
322  VLOG(1) << "Unknown id " << f.rpc_id;
323 
324  Envelope envelope(f.header_size, f.letter_size);
325 
326  // ReadEnvelope is called via Channel::Apply, so no need to call it here.
327  asio::read(*socket_, envelope.buf_seq(), ec);
328  return ec;
329  }
330 
331  // -- NO interrupt section begin
332  PendingCall& call = it->second;
333  Envelope* env = call.envelope;
334  env->Resize(f.header_size, f.letter_size);
335  bool is_stream = static_cast<bool>(call.cb);
336 
337  if (is_stream) {
338  VLOG(1) << "Processing stream";
339  asio::read(*socket_, env->buf_seq(), ec);
340  if (!ec) {
341  HandleStreamResponse(f.rpc_id);
342  }
343 
344  return ec;
345  }
346 
347  fibers::promise<error_code> promise = std::move(call.promise);
348  // We erase before reading from the socket/setting promise because pending_calls_ might change
349  // when we resume after IO and 'it' will be invalidated.
350  pending_calls_.erase(it);
351  pending_calls_size_.fetch_sub(1, std::memory_order_relaxed);
352  // -- NO interrupt section end
353 
354  asio::read(*socket_, env->buf_seq(), ec);
355  promise.set_value(ec);
356 
357  return ec;
358 }
359 
360 void Channel::HandleStreamResponse(RpcId rpc_id) {
361  auto it = pending_calls_.find(rpc_id);
362  if (it == pending_calls_.end()) {
363  return; // Might happen if pending_calls_ was cancelled when we read the envelope.
364  }
365  PendingCall& call = it->second;
366  error_code ec = call.cb(*call.envelope);
367  if (!ec)
368  return;
369 
370  // eof - means successful finish of stream receival.
371  if (ec == error::eof) {
372  ec = system::error_code{};
373  }
374 
375  // We finished processing the stream.
376  // Keep the promise on the stack and erase from pending_calls_ first because
377  // set_value might context switch and invalidate 'it'.
378  auto promise = std::move(call.promise);
379  pending_calls_.erase(it);
380  promise.set_value(ec);
381 }
382 
383 void Channel::CancelPendingCalls(error_code ec) {
384  if (pending_calls_.empty())
385  return;
386 
387  PendingMap tmp;
388  tmp.swap(pending_calls_);
389  pending_calls_size_.store(0, std::memory_order_relaxed);
390 
391  // promise might interrupt so we want to swap into local variable to allow stable iteration
392  // over the map. In case pending_calls_ did not change we swap back to preserve already allocated
393  // map.
394  for (auto& c : tmp) {
395  c.second.promise.set_value(ec);
396  }
397  tmp.clear();
398  if (pending_calls_.empty()) {
399  tmp.swap(pending_calls_);
400  }
401 }
402 
403 } // namespace rpc
404 } // namespace util