frame_format.h
1 // Copyright 2018, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 #pragma once
5 
6 #include <string>
7 
8 #include <boost/asio/ip/tcp.hpp>
9 #include <boost/asio/read.hpp>
10 
11 #include "base/integral_types.h"
12 #include "util/asio/asio_utils.h"
13 
14 namespace util {
15 class FiberSyncSocket;
16 
17 namespace rpc {
18 
19 /*
20  Frame structure:
21  header str ("URPC") - 4 bytes
22  uint8 version + control size length + message size length 1 byte (4bits + 2bits + 2bits)
23  uint56 rpc_id - LE56
24  header_size - LE of control size length
25  message size - LE on message size length
26  BLOB char[header_size + message_size]:
27  PB - control packet of size header_size
28  PB - message request of size message_size
29 */
30 
31 // Also defined in rpc_connection.h. Seems to work.
32 typedef uint64_t RpcId;
33 
34 class Frame {
35  static const uint32 kHeaderVal;
36 
37  public:
38  typedef ::boost::asio::ip::tcp::socket socket_t;
39 
40  RpcId rpc_id;
41  uint32_t header_size;
42  uint32_t letter_size;
43 
44  Frame() : rpc_id(1), header_size(0), letter_size(0) {}
45  Frame(RpcId r, uint32_t cs, uint32_t ms) : rpc_id(r), header_size(cs), letter_size(ms) {}
46 
47  enum { kMinByteSize = 4 + 1 + 7 + 2, kMaxByteSize = 4 + 1 + 7 + 4 * 2 };
48 
49  bool operator==(const Frame& other) const {
50  return other.rpc_id == rpc_id && other.header_size == header_size &&
51  other.letter_size == letter_size;
52  }
53 
54  // friend std::ostream& operator<<(std::ostream& o, const Frame& frame);
55 
56  uint32 total_size() const { return header_size + letter_size; }
57 
58  // dest must be at least kMaxByteSize size.
59  // Returns the exact number of bytes written to the buffer (less or equal to kMaxByteSize).
60  unsigned Write(uint8* dest) const;
61 
62  template <typename SyncReadStream>::boost::system::error_code Read(SyncReadStream* input) {
63  static_assert(!std::is_same<SyncReadStream, socket_t>::value, "");
64 
65  // Adding little extra to allow 64 bit loads from any address in the buf.
66  uint8 buf[kMaxByteSize + 8];
67  using namespace boost;
68 
69  system::error_code ec;
70  asio::read(*input, asio::buffer(buf, kMinByteSize), ec);
71  if (ec)
72  return ec;
73  uint8_t code = DecodeStart(buf, ec);
74  if (ec)
75  return ec;
76 
77  const uint8 header_sz_len_minus1 = code & 3;
78  const uint8 msg_sz_len_minus1 = code >> 2;
79 
80  // We stored 2 necessary bytes of boths lens, if it was not enough lets fill em up.
81  if (code) {
82  size_t to_read = header_sz_len_minus1 + msg_sz_len_minus1;
83  auto mbuf = asio::buffer(buf + kMinByteSize, to_read);
84  asio::read(*input, mbuf, ec);
85  if (ec)
86  return ec;
87  }
88 
89  DecodeEnd(buf + 12, header_sz_len_minus1, msg_sz_len_minus1);
90 
91  return ec;
92  }
93 
94  private:
95  uint8_t DecodeStart(const uint8_t* src, ::boost::system::error_code& ec);
96  void DecodeEnd(const uint8_t* src, uint8_t hsz_len, uint8_t lsz_len);
97 };
98 
99 } // namespace rpc
100 } // namespace util