8 #include <boost/asio/ip/tcp.hpp> 9 #include <boost/asio/read.hpp> 11 #include "base/integral_types.h" 12 #include "util/asio/asio_utils.h" 15 class FiberSyncSocket;
32 typedef uint64_t RpcId;
35 static const uint32 kHeaderVal;
38 typedef ::boost::asio::ip::tcp::socket socket_t;
44 Frame() : rpc_id(1), header_size(0), letter_size(0) {}
45 Frame(RpcId r, uint32_t cs, uint32_t ms) : rpc_id(r), header_size(cs), letter_size(ms) {}
47 enum { kMinByteSize = 4 + 1 + 7 + 2, kMaxByteSize = 4 + 1 + 7 + 4 * 2 };
49 bool operator==(
const Frame& other)
const {
50 return other.rpc_id == rpc_id && other.header_size == header_size &&
51 other.letter_size == letter_size;
56 uint32 total_size()
const {
return header_size + letter_size; }
60 unsigned Write(uint8* dest)
const;
62 template <typename SyncReadStream>::boost::system::error_code Read(SyncReadStream* input) {
63 static_assert(!std::is_same<SyncReadStream, socket_t>::value,
"");
66 uint8 buf[kMaxByteSize + 8];
67 using namespace boost;
69 system::error_code ec;
70 asio::read(*input, asio::buffer(buf, kMinByteSize), ec);
73 uint8_t code = DecodeStart(buf, ec);
77 const uint8 header_sz_len_minus1 = code & 3;
78 const uint8 msg_sz_len_minus1 = code >> 2;
82 size_t to_read = header_sz_len_minus1 + msg_sz_len_minus1;
83 auto mbuf = asio::buffer(buf + kMinByteSize, to_read);
84 asio::read(*input, mbuf, ec);
89 DecodeEnd(buf + 12, header_sz_len_minus1, msg_sz_len_minus1);
95 uint8_t DecodeStart(
const uint8_t* src, ::boost::system::error_code& ec);
96 void DecodeEnd(
const uint8_t* src, uint8_t hsz_len, uint8_t lsz_len);