channel.h
1 // Copyright 2018, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 #pragma once
5 
6 #include <boost/fiber/future.hpp>
7 
8 #include "absl/container/flat_hash_map.h"
9 
10 #include "base/RWSpinLock.h" //
11 #include "base/wheel_timer.h"
12 
13 #include "util/asio/fiber_socket.h"
14 #include "util/asio/periodic_task.h"
15 
16 #include "util/rpc/frame_format.h"
17 #include "util/rpc/rpc_envelope.h"
18 
19 namespace util {
20 namespace rpc {
21 
22 // Fiber-safe rpc client.
23 // Send(..) is also thread-safe and may be used from multiple threads differrent than
24 // of IoContext containing the channel, however it might incur performance penalty.
25 // Therefore to achieve maximal performance - it's advised to use Channel from IoContext thread.
26 class Channel {
27  public:
28  using error_code = FiberSyncSocket::error_code;
29  using future_code_t = boost::fibers::future<error_code>;
30 
31  // Returns boost::asio::error::eof if the Stream has been finished,
32  // if bool(error_code) returns true, aborts receiving the stream and returns the error.
33  using MessageCallback = std::function<error_code(Envelope&)>;
34 
35  Channel(FiberSyncSocket* socket) : socket_(socket) {
36  }
37 
38  Channel(const std::string& hostname, const std::string& service, IoContext* cntx)
39  : Channel(new FiberSyncSocket(hostname, service, cntx)) {
40  }
41 
42  ~Channel();
43 
44  // Blocks at least for 'ms' milliseconds to connect to the host.
45  // Should be called once during the initialization phase before sending the requests.
46  error_code Connect(uint32_t ms);
47 
48  // Thread-safe function.
49  // Sends the envelope and returns the future to the response status code.
50  // Future is realized when response is received and serialized into the same envelope.
51  // Send() might block therefore it should not be called directly from IoContext loop (post).
52  future_code_t Send(uint32_t deadline_msec, Envelope* envelope);
53 
54  // Fiber-blocking call. Sends and waits until the response is back.
55  // Similarly to Send, the response is written into the same envelope.
56  error_code SendSync(uint32_t deadline_msec, Envelope* envelope) {
57  return Send(deadline_msec, envelope).get();
58  }
59 
60  // Sends a msg and wait to receive a stream of envelopes.
61  // For each envelope MessageCallback is called.
62  // MessageCallback should return True if more items are expected in the stream.
63  // i.e. Envelope should contain stream-related information to allow MessageCallback to
64  // decide whether more envelopes should come.
65  error_code SendAndReadStream(Envelope* msg, MessageCallback cb);
66 
67  // Blocks the calling fiber until all the background processes finish.
68  void Shutdown();
69 
70  private:
71  void ReadFiber();
72  void FlushFiber();
73 
74  void CancelPendingCalls(error_code ec);
75  error_code ReadEnvelope();
76  error_code PresendChecks();
77  error_code FlushSends();
78  error_code FlushSendsGuarded();
79 
80  void CancelSentBufferGuarded(error_code ec);
81  void ExpirePending(RpcId id);
82 
83  bool OutgoingBufLock() {
84  bool lock_exclusive = !socket_->context().InContextThread();
85 
86  if (lock_exclusive)
87  buf_lock_.lock();
88  else
89  buf_lock_.lock_shared();
90 
91  return lock_exclusive;
92  }
93 
94  void OutgoingBufUnlock(bool exclusive) {
95  if (exclusive)
96  buf_lock_.unlock();
97  else
98  buf_lock_.unlock_shared();
99  }
100 
101  void HandleStreamResponse(RpcId rpc_id);
102 
103  class ExpiryEvent : public base::TimerEventInterface {
104  public:
105  explicit ExpiryEvent(Channel* me) : me_(me) {
106  }
107 
108  // ExpiryEvent can not be moved because its address is registered inside TimerWheel.
109  void execute() final { me_->ExpirePending(id_); }
110 
111  void set_id(RpcId i) { id_ = i; }
112 
113  private:
114  Channel* me_;
115  RpcId id_ = 0;
116  };
117 
118  RpcId next_send_rpc_id_ = 1;
119  std::unique_ptr<FiberSyncSocket> socket_;
120 
121  typedef boost::fibers::promise<error_code> EcPromise;
122 
123  struct PendingCall {
124  EcPromise promise;
125  Envelope* envelope;
126 
127  MessageCallback cb; // for Stream response.
128 
129  PendingCall(EcPromise p, Envelope* env, MessageCallback mcb = MessageCallback{})
130  : promise(std::move(p)), envelope(env), cb(std::move(mcb)) {
131  }
132 
133  std::unique_ptr<ExpiryEvent> expiry_event;
134  };
135 
136  // The flow is as follows:
137  // Send fiber enques requests into outgoing_buf_. It's thread-safe, protected by spinlock.
138  // FlushSendsGuarded flushes outgoing_buf_ into the socket. ReadFiber receives envelopes and
139  // triggers the receive flow: it might call Stream Handler and at the end it realizes
140  // future signalling the end of rpc call.
141  typedef std::pair<RpcId, PendingCall> SendItem;
142 
143  folly::RWSpinLock buf_lock_;
144  std::vector<SendItem> outgoing_buf_; // protected by buf_lock_.
145  std::atomic_ulong outgoing_buf_size_{0};
146 
147  boost::fibers::fiber read_fiber_, flush_fiber_;
148  boost::fibers::mutex send_mu_; // protects FlushSendsGuarded.
149 
150  // Used in FlushSendsGuarded to flush buffers efficiently.
151  std::vector<boost::asio::const_buffer> write_seq_;
152  base::PODArray<std::array<uint8_t, rpc::Frame::kMaxByteSize>> frame_buf_;
153 
154  typedef absl::flat_hash_map<RpcId, PendingCall> PendingMap;
155  PendingMap pending_calls_;
156  std::atomic_ulong pending_calls_size_{0};
157 
158  // Handles expiration flow.
159  base::TimerWheel expire_timer_;
160  std::unique_ptr<PeriodicTask> expiry_task_;
161 };
162 
163 } // namespace rpc
164 } // namespace util