fiber_socket.cc
1 // Copyright 2018, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 #include "util/asio/fiber_socket.h"
5 
6 #include <boost/asio/connect.hpp>
7 #include <chrono>
8 
9 #include "absl/base/attributes.h"
10 #include "base/logging.h"
11 #include "util/asio/io_context.h"
12 #include "util/fibers/event_count.h"
13 
14 #define VSOCK(verbosity) VLOG(verbosity) << "sock[" << native_handle() << "] "
15 #define DVSOCK(verbosity) DVLOG(verbosity) << "sock[" << native_handle() << "] "
16 
17 namespace util {
18 
19 using namespace boost;
20 using namespace std;
21 using namespace chrono_literals;
22 
23 namespace detail {
24 
25 template <typename Duration> uint32_t ms_duration(const Duration& d) {
26  return chrono::duration_cast<chrono::milliseconds>(d).count();
27 }
28 
29 using socket_t = FiberSocketImpl::next_layer_type;
30 
32  fibers::fiber worker;
33  fibers::condition_variable_any cv_st;
34  fibers_ext::EventCount worker_ev;
35 
36  IoContext* io_cntx;
37 
38  fibers::mutex connect_mu;
39  chrono::steady_clock::duration connect_duration = chrono::seconds(2);
40 
41  ClientData(IoContext* io) : io_cntx(io) {
42  }
43 };
44 
45 FiberSocketImpl::~FiberSocketImpl() {
46  VLOG(1) << "FiberSocketImpl::~FiberSocketImpl";
47 
48  error_code ec;
49  Shutdown(ec);
50 }
51 
52 FiberSocketImpl::FiberSocketImpl(socket_t&& sock, size_t rbuf_size)
53  : rbuf_size_(rbuf_size), sock_(std::move(sock)), rbuf_(new uint8_t[rbuf_size]) {
54 }
55 
56 // Creates a client socket.
57 FiberSocketImpl::FiberSocketImpl(const std::string& hname, const std::string& port, IoContext* cntx,
58  size_t rbuf_size)
59  : FiberSocketImpl(socket_t(cntx->raw_context()), rbuf_size) {
60  VLOG(1) << "FiberSocketImpl::FiberSocketImpl " << sock_.native_handle();
61  hname_ = hname;
62  port_ = port;
63  io_cntx_ = cntx;
64  status_ = asio::error::not_connected;
65 }
66 
67 void FiberSocketImpl::Shutdown(error_code& ec) {
68  if (!is_open_)
69  return;
70  auto handle = sock_.native_handle();
71  auto cb = [&] {
72  if (!is_open_) {
73  DVLOG(1) << "Already closed " << handle;
74  return;
75  }
76 
77  is_open_ = false;
78  sock_.cancel(ec);
79  sock_.shutdown(socket_t::shutdown_both, ec);
80  VSOCK(1) << "Sock Shutdown ";
81  if (clientsock_data_) {
82  DVSOCK(1) << "Sock Closed";
83  WakeWorker();
84 
85  if (clientsock_data_->worker.joinable())
86  clientsock_data_->worker.join();
87  DVSOCK(1) << "Worker Joined";
88  }
89  };
90 
91  if (clientsock_data_) {
92  VSOCK(1) << "AwaitShutdown";
93  clientsock_data_->io_cntx->AwaitSafe(cb);
94  } else {
95  cb();
96  }
97 }
98 
99 void FiberSocketImpl::SetStatus(const error_code& ec, const char* where) {
100  status_ = ec;
101  if (ec) {
102  VSOCK(1) << "Setting status to " << ec << "/" << ec.message() << " at " << where;
103  }
104 }
105 
106 void FiberSocketImpl::WakeWorker() {
107  // This notify function is efficient and usually awakes worker fiber that
108  // is indeed suspended.
109  clientsock_data_->worker_ev.notify();
110 }
111 
112 void FiberSocketImpl::InitiateConnection() {
113  CHECK(!clientsock_data_ && (&io_cntx_->raw_context() == &sock_.get_executor().context()));
114 
115  clientsock_data_.reset(new ClientData(io_cntx_));
116  io_cntx_->Await([this] {
117  rslice_ = asio::buffer(rbuf_.get(), 0);
118  clientsock_data_->worker = fibers::fiber(&FiberSocketImpl::ClientWorker, this);
119  });
120 }
121 
122 // Waits for socket to become connected. Can be called from any thread.
123 system::error_code FiberSocketImpl::ClientWaitToConnect(uint32_t ms) {
124  if (!clientsock_data_) {
125  InitiateConnection();
126  }
127  using std::chrono::milliseconds;
128 
129  std::unique_lock<fibers::mutex> lock(clientsock_data_->connect_mu);
130  clientsock_data_->cv_st.wait_for(lock, milliseconds(ms), [this] { return !status_; });
131 
132  return status_;
133 }
134 
135 void FiberSocketImpl::ClientWorker() {
136  while (is_open_) {
137  if (status_) {
138  VSOCK(1) << "Status " << status_;
139  error_code ec = Reconnect(hname_, port_);
140  VSOCK(1) << "After Reconnect: " << ec << "/" << ec.message() << " is_open: " << is_open_;
141  if (ec && is_open_) { // Only sleep for open socket for the next reconnect.
142  this_fiber::sleep_for(10ms);
143  }
144  continue;
145  }
146  DCHECK(sock_.non_blocking());
147 
148  error_code ec;
149  VSOCK(3) << "BeforeAsyncWait";
150  sock_.async_wait(socket_t::wait_read, fibers_ext::yield[ec]);
151  if (ec) {
152  LOG_IF(ERROR, is_open_) << "AsyncWait: " << ec.message();
153  continue;
154  }
155 
156  size_t read_capacity = rbuf_size_ - rslice_.size();
157  if (read_state_ == READ_IDLE && read_capacity) {
158  uint8_t* next = static_cast<uint8_t*>(rslice_.data()) + rslice_.size();
159 
160  // Direct but non-blocking call since we know we should be able to receive.
161  // Since it's direct - we do not context-switch.
162  size_t read_cnt = sock_.receive(asio::mutable_buffer(next, read_capacity), 0, status_);
163  if (!status_) {
164  rslice_ = asio::mutable_buffer(rslice_.data(), rslice_.size() + read_cnt);
165  } else if (status_ == system::errc::resource_unavailable_try_again) {
166  status_.clear();
167  } else {
168  VSOCK(1) << "Receive: " << status_.message() << ", read_cnt " << 0;
169  }
170  continue;
171  }
172 
173  this_fiber::yield();
174 
175  VLOG(3) << "BeforeCvReadWait";
176  auto should_iterate = [this] {
177  return !is_open() || (read_state_ == READ_IDLE && rslice_.size() != rbuf_size_);
178  };
179 
180  clientsock_data_->worker_ev.await(should_iterate);
181 
182  VLOG(3) << "WorkerIteration: ";
183  }
184  VLOG(1) << "FiberSocketReadExit";
185 }
186 
187 ABSL_MUST_USE_RESULT system::error_code OpenAndConfigure(bool keep_alive,
188  asio::ip::tcp::socket& sock) {
189  using namespace asio::ip;
190  system::error_code ec;
191 
192  if (sock.is_open()) {
193  sock.close(ec);
194  CHECK(!sock.is_open());
195  }
196  sock.open(tcp::v4(), ec);
197  if (ec)
198  return ec;
199  CHECK(sock.is_open());
200 
201  // We close before we configure. That way we can reuse the same socket descriptor.
202  socket_t::reuse_address opt(true);
203  sock.set_option(opt, ec);
204  if (ec)
205  return ec;
206  socket_t::keep_alive opt2(keep_alive);
207  sock.set_option(opt2, ec);
208  if (ec)
209  return ec;
210 
211  sock.non_blocking(true, ec);
212  if (ec)
213  return ec;
214  return system::error_code{};
215 }
216 
217 system::error_code FiberSocketImpl::Reconnect(const std::string& hname,
218  const std::string& service) {
219  DCHECK(clientsock_data_);
220  using namespace asio::ip;
221 
222  auto& asio_io_cntx = clientsock_data_->io_cntx->raw_context();
223 
224  tcp::resolver resolver(asio_io_cntx);
225 
226  system::error_code ec;
227 
228  VSOCK(1) << "Before AsyncResolve " << is_open();
229 
230  // It seems that resolver waits for 10s and ignores cancel command.
231  auto results = resolver.async_resolve(tcp::v4(), hname, service, fibers_ext::yield[ec]);
232  if (ec) {
233  VLOG(1) << "Resolver error " << ec;
234  return ec;
235  }
236  DVSOCK(1) << "After AsyncResolve, got " << results.size() << " results";
237 
238  asio::steady_timer timer(asio_io_cntx, clientsock_data_->connect_duration);
239  timer.async_wait([&](const system::error_code& ec) {
240  if (!ec) { // Successfully expired.
241  VSOCK(1) << "Cancelling after " << detail::ms_duration(clientsock_data_->connect_duration)
242  << " millis.";
243  sock_.cancel();
244  }
245  });
246 
249  size_t result_index = 0;
250  for (const auto& remote_dest : results) {
251  ec = OpenAndConfigure(keep_alive_, sock_);
252  if (ec)
253  break;
254  sock_.async_connect(remote_dest, fibers_ext::yield[ec]);
255 
256  // If we succeeded - break the loop.
257  // Also, for operation aborted we do not iterate since it means we went over connect_duration
258  // limit.
259  if (!ec || ec == asio::error::operation_aborted)
260  break;
261  VSOCK(2) << "Connect iteration " << result_index << ", error " << ec;
262  ++result_index;
263  }
264  VSOCK(1) << "After async_connect " << ec << " result index " << result_index;
265 
266  if (ec) {
267  SetStatus(ec, "reconnect");
268  } else {
269  CHECK(sock_.non_blocking()) << native_handle();
270 
271  socket_t::keep_alive ka_opt;
272  sock_.get_option(ka_opt, ec);
273  CHECK_EQ(keep_alive_, ka_opt.value()) << keep_alive_;
274 
275  // Use mutex to so that WaitToConnect would be thread-safe.
276  std::lock_guard<fibers::mutex> lock(clientsock_data_->connect_mu);
277  status_.clear();
278 
279  // notify_one awakes only those threads that already suspend on cnd.wait(). Therefore
280  // we must change status_ under mutex.
281  clientsock_data_->cv_st.notify_one();
282  }
283  return status_;
284 }
285 
286 IoContext& FiberSocketImpl::context() {
287  CHECK(clientsock_data_);
288  return *clientsock_data_->io_cntx;
289 }
290 
291 } // namespace detail
292 } // namespace util