6 #include <boost/fiber/future.hpp> 8 #include "absl/container/flat_hash_map.h" 10 #include "base/RWSpinLock.h" 11 #include "base/wheel_timer.h" 13 #include "util/asio/fiber_socket.h" 14 #include "util/asio/periodic_task.h" 16 #include "util/rpc/frame_format.h" 17 #include "util/rpc/rpc_envelope.h" 28 using error_code = FiberSyncSocket::error_code;
29 using future_code_t = boost::fibers::future<error_code>;
33 using MessageCallback = std::function<error_code(
Envelope&)>;
38 Channel(
const std::string& hostname,
const std::string& service,
IoContext* cntx)
46 error_code Connect(uint32_t ms);
52 future_code_t Send(uint32_t deadline_msec,
Envelope* envelope);
56 error_code SendSync(uint32_t deadline_msec,
Envelope* envelope) {
57 return Send(deadline_msec, envelope).get();
65 error_code SendAndReadStream(
Envelope* msg, MessageCallback cb);
74 void CancelPendingCalls(error_code ec);
75 error_code ReadEnvelope();
76 error_code PresendChecks();
77 error_code FlushSends();
78 error_code FlushSendsGuarded();
80 void CancelSentBufferGuarded(error_code ec);
81 void ExpirePending(RpcId
id);
83 bool OutgoingBufLock() {
84 bool lock_exclusive = !socket_->context().InContextThread();
89 buf_lock_.lock_shared();
91 return lock_exclusive;
94 void OutgoingBufUnlock(
bool exclusive) {
98 buf_lock_.unlock_shared();
101 void HandleStreamResponse(RpcId rpc_id);
103 class ExpiryEvent :
public base::TimerEventInterface {
105 explicit ExpiryEvent(
Channel* me) : me_(me) {
109 void execute()
final { me_->ExpirePending(id_); }
111 void set_id(RpcId i) { id_ = i; }
118 RpcId next_send_rpc_id_ = 1;
119 std::unique_ptr<FiberSyncSocket> socket_;
121 typedef boost::fibers::promise<error_code> EcPromise;
129 PendingCall(EcPromise p,
Envelope* env, MessageCallback mcb = MessageCallback{})
130 : promise(std::move(p)), envelope(env), cb(std::move(mcb)) {
133 std::unique_ptr<ExpiryEvent> expiry_event;
141 typedef std::pair<RpcId, PendingCall> SendItem;
143 folly::RWSpinLock buf_lock_;
144 std::vector<SendItem> outgoing_buf_;
145 std::atomic_ulong outgoing_buf_size_{0};
147 boost::fibers::fiber read_fiber_, flush_fiber_;
148 boost::fibers::mutex send_mu_;
151 std::vector<boost::asio::const_buffer> write_seq_;
152 base::PODArray<std::array<uint8_t, rpc::Frame::kMaxByteSize>> frame_buf_;
154 typedef absl::flat_hash_map<RpcId, PendingCall> PendingMap;
155 PendingMap pending_calls_;
156 std::atomic_ulong pending_calls_size_{0};
159 base::TimerWheel expire_timer_;
160 std::unique_ptr<PeriodicTask> expiry_task_;