rpc_conn_handler.h
1 // Copyright 2018, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 
5 #pragma once
6 
7 #include "base/object_pool.h"
8 
9 #include "util/asio/io_context.h"
10 #include "util/asio/connection_handler.h"
11 #include "util/rpc/frame_format.h"
12 #include "util/rpc/rpc_connection.h"
13 
14 namespace util {
15 namespace rpc {
16 
17 using namespace boost;
18 
19 // Generally it's an object called from a single fiber.
20 // However FlushFiber runs from a background fiber and makes sure that all outgoing writes
21 // are flushed to socket.
23  public:
24  // bridge is owned by RpcConnectionHandler instance.
25  // RpcConnectionHandler is created in acceptor thread and not in the socket thread.
28 
29  system::error_code HandleRequest() final override;
30 
31  private:
32  bool FlushWrites() override;
33 
34  // protected by wr_mu_ to preserve transcational semantics.
35  // Returns true if the flush ocurred.
36  bool FlushWritesInternal();
37 
38  // The following methods are run in the socket thread (thread that calls HandleRequest.)
39  void OnOpenSocket() final;
40  void OnCloseSocket() final;
41 
42  std::unique_ptr<ConnectionBridge> bridge_;
43 
44  struct RpcItem : public intrusive::slist_base_hook<intrusive::link_mode<intrusive::normal_link>> {
45  RpcId id;
46  Envelope envelope;
47 
48  RpcItem() = default;
49  RpcItem(RpcId i, Envelope env) : id(i), envelope(std::move(env)) {
50  }
51 
52  auto buf_seq() {
53  return envelope.buf_seq();
54  }
55  };
56  using ItemList = intrusive::slist<RpcItem, intrusive::cache_last<true>>;
57 
58  system::error_code ec_;
59  base::ObjectPool<RpcItem> rpc_items_;
60  ItemList outgoing_buf_;
61 
62  fibers::mutex wr_mu_;
63  std::vector<asio::const_buffer> write_seq_;
64  base::PODArray<std::array<uint8_t, rpc::Frame::kMaxByteSize>> frame_buf_;
65  uint64_t req_flushes_ = 0;
66 };
67 
68 } // namespace rpc
69 } // namespace util