4 #include "util/asio/fiber_socket.h"     6 #include <boost/asio/connect.hpp>     9 #include "absl/base/attributes.h"    10 #include "base/logging.h"    11 #include "util/asio/io_context.h"    12 #include "util/fibers/event_count.h"    14 #define VSOCK(verbosity) VLOG(verbosity) << "sock[" << native_handle() << "] "    15 #define DVSOCK(verbosity) DVLOG(verbosity) << "sock[" << native_handle() << "] "    19 using namespace boost;
    21 using namespace chrono_literals;
    25 template <
typename Duration> uint32_t ms_duration(
const Duration& d) {
    26   return chrono::duration_cast<chrono::milliseconds>(d).count();
    29 using socket_t = FiberSocketImpl::next_layer_type;
    33   fibers::condition_variable_any cv_st;
    38   fibers::mutex connect_mu;
    39   chrono::steady_clock::duration connect_duration = chrono::seconds(2);
    45 FiberSocketImpl::~FiberSocketImpl() {
    46   VLOG(1) << 
"FiberSocketImpl::~FiberSocketImpl";
    52 FiberSocketImpl::FiberSocketImpl(socket_t&& sock, 
size_t rbuf_size)
    53     : rbuf_size_(rbuf_size), sock_(std::move(sock)), rbuf_(new uint8_t[rbuf_size]) {
    57 FiberSocketImpl::FiberSocketImpl(
const std::string& hname, 
const std::string& port, 
IoContext* cntx,
    59     : FiberSocketImpl(socket_t(cntx->raw_context()), rbuf_size) {
    60   VLOG(1) << 
"FiberSocketImpl::FiberSocketImpl " << sock_.native_handle();
    64   status_ = asio::error::not_connected;
    67 void FiberSocketImpl::Shutdown(error_code& ec) {
    70   auto handle = sock_.native_handle();
    73       DVLOG(1) << 
"Already closed " << handle;
    79     sock_.shutdown(socket_t::shutdown_both, ec);
    80     VSOCK(1) << 
"Sock Shutdown ";
    81     if (clientsock_data_) {
    82       DVSOCK(1) << 
"Sock Closed";
    85       if (clientsock_data_->worker.joinable())
    86         clientsock_data_->worker.join();
    87       DVSOCK(1) << 
"Worker Joined";
    91   if (clientsock_data_) {
    92     VSOCK(1) << 
"AwaitShutdown";
    93     clientsock_data_->io_cntx->AwaitSafe(cb);
    99 void FiberSocketImpl::SetStatus(
const error_code& ec, 
const char* where) {
   102     VSOCK(1) << 
"Setting status to " << ec << 
"/" << ec.message() << 
" at " << where;
   106 void FiberSocketImpl::WakeWorker() {
   109   clientsock_data_->worker_ev.notify();
   112 void FiberSocketImpl::InitiateConnection() {
   113   CHECK(!clientsock_data_ && (&io_cntx_->raw_context() == &sock_.get_executor().context()));
   115   clientsock_data_.reset(
new ClientData(io_cntx_));
   116   io_cntx_->Await([
this] {
   117     rslice_ = asio::buffer(rbuf_.get(), 0);
   118     clientsock_data_->worker = fibers::fiber(&FiberSocketImpl::ClientWorker, 
this);
   123 system::error_code FiberSocketImpl::ClientWaitToConnect(uint32_t ms) {
   124   if (!clientsock_data_) {
   125     InitiateConnection();
   127   using std::chrono::milliseconds;
   129   std::unique_lock<fibers::mutex> lock(clientsock_data_->connect_mu);
   130   clientsock_data_->cv_st.wait_for(lock, milliseconds(ms), [
this] { 
return !status_; });
   135 void FiberSocketImpl::ClientWorker() {
   138       VSOCK(1) << 
"Status " << status_;
   139       error_code ec = Reconnect(hname_, port_);
   140       VSOCK(1) << 
"After  Reconnect: " << ec << 
"/" << ec.message() << 
" is_open: " << is_open_;
   141       if (ec && is_open_) {  
   142         this_fiber::sleep_for(10ms);
   146     DCHECK(sock_.non_blocking());
   149     VSOCK(3) << 
"BeforeAsyncWait";
   150     sock_.async_wait(socket_t::wait_read, fibers_ext::yield[ec]);
   152       LOG_IF(ERROR, is_open_) << 
"AsyncWait: " << ec.message();
   156     size_t read_capacity = rbuf_size_ - rslice_.size();
   157     if (read_state_ == READ_IDLE && read_capacity) {
   158       uint8_t* next = static_cast<uint8_t*>(rslice_.data()) + rslice_.size();
   162       size_t read_cnt = sock_.receive(asio::mutable_buffer(next, read_capacity), 0, status_);
   164         rslice_ = asio::mutable_buffer(rslice_.data(), rslice_.size() + read_cnt);
   165       } 
else if (status_ == system::errc::resource_unavailable_try_again) {
   168         VSOCK(1) << 
"Receive: " << status_.message() << 
", read_cnt " << 0;
   175     VLOG(3) << 
"BeforeCvReadWait";
   176     auto should_iterate = [
this] {
   177       return !is_open() || (read_state_ == READ_IDLE && rslice_.size() != rbuf_size_);
   180     clientsock_data_->worker_ev.await(should_iterate);
   182     VLOG(3) << 
"WorkerIteration: ";
   184   VLOG(1) << 
"FiberSocketReadExit";
   187 ABSL_MUST_USE_RESULT system::error_code OpenAndConfigure(
bool keep_alive,
   188                                                          asio::ip::tcp::socket& sock) {
   189   using namespace asio::ip;
   190   system::error_code ec;
   192   if (sock.is_open()) {
   194     CHECK(!sock.is_open());
   196   sock.open(tcp::v4(), ec);
   199   CHECK(sock.is_open());
   202   socket_t::reuse_address opt(
true);
   203   sock.set_option(opt, ec);
   206   socket_t::keep_alive opt2(keep_alive);
   207   sock.set_option(opt2, ec);
   211   sock.non_blocking(
true, ec);
   214   return system::error_code{};
   217 system::error_code FiberSocketImpl::Reconnect(
const std::string& hname,
   218                                               const std::string& service) {
   219   DCHECK(clientsock_data_);
   220   using namespace asio::ip;
   222   auto& asio_io_cntx = clientsock_data_->io_cntx->raw_context();
   224   tcp::resolver resolver(asio_io_cntx);
   226   system::error_code ec;
   228   VSOCK(1) << 
"Before AsyncResolve " << is_open();
   231   auto results = resolver.async_resolve(tcp::v4(), hname, service, fibers_ext::yield[ec]);
   233     VLOG(1) << 
"Resolver error " << ec;
   236   DVSOCK(1) << 
"After AsyncResolve, got " << results.size() << 
" results";
   238   asio::steady_timer timer(asio_io_cntx, clientsock_data_->connect_duration);
   239   timer.async_wait([&](
const system::error_code& ec) {
   241       VSOCK(1) << 
"Cancelling after " << detail::ms_duration(clientsock_data_->connect_duration)
   249   size_t result_index = 0;
   250   for (
const auto& remote_dest : results) {
   251     ec = OpenAndConfigure(keep_alive_, sock_);
   254     sock_.async_connect(remote_dest, fibers_ext::yield[ec]);
   259     if (!ec || ec == asio::error::operation_aborted)
   261     VSOCK(2) << 
"Connect iteration " << result_index << 
", error " << ec;
   264   VSOCK(1) << 
"After async_connect " << ec << 
" result index " << result_index;
   267     SetStatus(ec, 
"reconnect");
   269     CHECK(sock_.non_blocking()) << native_handle();
   271     socket_t::keep_alive ka_opt;
   272     sock_.get_option(ka_opt, ec);
   273     CHECK_EQ(keep_alive_, ka_opt.value()) << keep_alive_;
   276     std::lock_guard<fibers::mutex> lock(clientsock_data_->connect_mu);
   281     clientsock_data_->cv_st.notify_one();
   286 IoContext& FiberSocketImpl::context() {
   287   CHECK(clientsock_data_);
   288   return *clientsock_data_->io_cntx;