fiber_socket_impl.h
1 // Copyright 2019, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 #pragma once
5 
6 #include <boost/asio/ip/tcp.hpp>
7 
8 #include "util/asio/yield.h"
9 
10 namespace util {
11 class IoContext;
12 
13 namespace detail {
14 
16  public:
17  using error_code = ::boost::system::error_code;
18  using next_layer_type = ::boost::asio::ip::tcp::socket;
19 
20  // C'tor can be called from any thread.
21  FiberSocketImpl(next_layer_type&& sock, size_t rbuf_size);
22 
23  // Creates a client socket.
24  FiberSocketImpl(const std::string& hname, const std::string& port,
25  IoContext* cntx, size_t rbuf_size);
26 
27  // FiberSocketImpl can not be moveable due to attached fiber.
28  FiberSocketImpl(FiberSocketImpl&& other) = delete;
29 
30  ~FiberSocketImpl();
31 
32 
33  // Waits for client socket to become connected. Can be called from any thread.
34  // Please note that connection status might be stale if called from a foreigh thread.
35  error_code ClientWaitToConnect(uint32_t ms);
36 
37  // Read/Write functions should be called from IoContext thread.
38  // (fiber) SyncRead interface:
39  // https://www.boost.org/doc/libs/1_69_0/doc/html/boost_asio/reference/SyncReadStream.html
40  template <typename MBS> size_t read_some(const MBS& bufs, error_code& ec);
41 
42 
43  // SyncWrite interface:
44  // https://www.boost.org/doc/libs/1_69_0/doc/html/boost_asio/reference/SyncWriteStream.html
45  template <typename BS> size_t write_some(const BS& bufs, error_code& ec);
46 
47  next_layer_type::native_handle_type native_handle() { return sock_.native_handle(); }
48 
49  bool is_open() const { return is_open_; }
50 
51  // Closes the socket and shuts down its background processes if needed.
52  // For client socket it's thread-safe but for non-client it should be called
53  // from the socket thread.
54  void Shutdown(error_code& ec);
55 
56  next_layer_type::endpoint_type remote_endpoint(error_code& ec) const {
57  return sock_.remote_endpoint(ec);
58  }
59 
60  error_code status() const { return status_;}
61 
62  // For debugging.
63  next_layer_type& next_layer() { return sock_; }
64 
65  // For debugging/testing.
66  IoContext& context();
67 
68  bool keep_alive() const { return keep_alive_;}
69  void set_keep_alive(bool flag) { keep_alive_ = flag;}
70 
71  private:
72  // Asynchronous function that make this socket a client socket and initiates client-flow
73  // connection process. Should be called only once. Can be called from any thread.
74  void InitiateConnection();
75 
76  void ClientWorker();
77 
78  void WakeWorker();
79  error_code Reconnect(const std::string& hname, const std::string& service);
80  void SetStatus(const error_code& ec, const char* where);
81 
82  error_code status_;
83 
84  // socket.is_open() is unreliable and does not reflect close() status even if is called
85  // from the same thread.
86  bool is_open_ = true, keep_alive_ = false;
87  enum State { READ_IDLE, READ_ACTIVE } read_state_ = READ_IDLE;
88 
89  size_t rbuf_size_;
90  next_layer_type sock_;
91  std::unique_ptr<uint8_t[]> rbuf_;
92  ::boost::asio::mutable_buffer rslice_;
93 
94  std::string hname_, port_;
95  IoContext* io_cntx_ = nullptr;
96 
97  // Stuff related to client sockets.
98  struct ClientData;
99  std::unique_ptr<ClientData> clientsock_data_;
100 };
101 
102 template <typename MBS> size_t FiberSocketImpl::read_some(const MBS& bufs, error_code& ec) {
103  using namespace boost;
104  if (rslice_.size()) {
105  size_t copied = asio::buffer_copy(bufs, rslice_);
106  if (rslice_.size() == copied) {
107  rslice_ = asio::mutable_buffer(rbuf_.get(), 0);
108  } else {
109  rslice_ += copied;
110  }
111  if (clientsock_data_) {
112  WakeWorker(); // For client socket case.
113  }
114  return copied;
115  }
116 
117  if (status_) {
118  ec = status_;
119  return 0;
120  }
121 
122  size_t user_size = asio::buffer_size(bufs);
123  auto new_seq = make_buffer_seq(bufs, asio::mutable_buffer(rbuf_.get(), rbuf_size_));
124 
125  size_t read_size = sock_.read_some(new_seq, ec);
126  if (ec == asio::error::would_block) {
127  read_state_ = READ_ACTIVE;
128  read_size = sock_.async_read_some(new_seq, fibers_ext::yield[ec]);
129  read_state_ = READ_IDLE;
130  }
131  if (ec) {
132  SetStatus(ec, "read_some");
133  }
134  if (clientsock_data_) {
135  WakeWorker(); // For client socket case.
136  }
137  if (read_size > user_size) {
138  rslice_ = asio::mutable_buffer(rbuf_.get(), read_size - user_size);
139  read_size = user_size;
140  }
141  return read_size;
142 }
143 
144 template <typename BS> size_t FiberSocketImpl::write_some(const BS& bufs, error_code& ec) {
145  size_t res = sock_.write_some(bufs, ec);
146  if (ec == ::boost::asio::error::would_block) {
147  return sock_.async_write_some(bufs, fibers_ext::yield[ec]);
148  }
149  return res;
150 }
151 
152 } // namespace detail
153 
154 
155 } // namespace util