4 #include "util/asio/fiber_socket.h" 6 #include <boost/asio/connect.hpp> 9 #include "absl/base/attributes.h" 10 #include "base/logging.h" 11 #include "util/asio/io_context.h" 12 #include "util/fibers/event_count.h" 14 #define VSOCK(verbosity) VLOG(verbosity) << "sock[" << native_handle() << "] " 15 #define DVSOCK(verbosity) DVLOG(verbosity) << "sock[" << native_handle() << "] " 19 using namespace boost;
21 using namespace chrono_literals;
25 template <
typename Duration> uint32_t ms_duration(
const Duration& d) {
26 return chrono::duration_cast<chrono::milliseconds>(d).count();
29 using socket_t = FiberSocketImpl::next_layer_type;
33 fibers::condition_variable_any cv_st;
38 fibers::mutex connect_mu;
39 chrono::steady_clock::duration connect_duration = chrono::seconds(2);
45 FiberSocketImpl::~FiberSocketImpl() {
46 VLOG(1) <<
"FiberSocketImpl::~FiberSocketImpl";
52 FiberSocketImpl::FiberSocketImpl(socket_t&& sock,
size_t rbuf_size)
53 : rbuf_size_(rbuf_size), sock_(std::move(sock)), rbuf_(new uint8_t[rbuf_size]) {
57 FiberSocketImpl::FiberSocketImpl(
const std::string& hname,
const std::string& port,
IoContext* cntx,
59 : FiberSocketImpl(socket_t(cntx->raw_context()), rbuf_size) {
60 VLOG(1) <<
"FiberSocketImpl::FiberSocketImpl " << sock_.native_handle();
64 status_ = asio::error::not_connected;
67 void FiberSocketImpl::Shutdown(error_code& ec) {
70 auto handle = sock_.native_handle();
73 DVLOG(1) <<
"Already closed " << handle;
79 sock_.shutdown(socket_t::shutdown_both, ec);
80 VSOCK(1) <<
"Sock Shutdown ";
81 if (clientsock_data_) {
82 DVSOCK(1) <<
"Sock Closed";
85 if (clientsock_data_->worker.joinable())
86 clientsock_data_->worker.join();
87 DVSOCK(1) <<
"Worker Joined";
91 if (clientsock_data_) {
92 VSOCK(1) <<
"AwaitShutdown";
93 clientsock_data_->io_cntx->AwaitSafe(cb);
99 void FiberSocketImpl::SetStatus(
const error_code& ec,
const char* where) {
102 VSOCK(1) <<
"Setting status to " << ec <<
"/" << ec.message() <<
" at " << where;
106 void FiberSocketImpl::WakeWorker() {
109 clientsock_data_->worker_ev.notify();
112 void FiberSocketImpl::InitiateConnection() {
113 CHECK(!clientsock_data_ && (&io_cntx_->raw_context() == &sock_.get_executor().context()));
115 clientsock_data_.reset(
new ClientData(io_cntx_));
116 io_cntx_->Await([
this] {
117 rslice_ = asio::buffer(rbuf_.get(), 0);
118 clientsock_data_->worker = fibers::fiber(&FiberSocketImpl::ClientWorker,
this);
123 system::error_code FiberSocketImpl::ClientWaitToConnect(uint32_t ms) {
124 if (!clientsock_data_) {
125 InitiateConnection();
127 using std::chrono::milliseconds;
129 std::unique_lock<fibers::mutex> lock(clientsock_data_->connect_mu);
130 clientsock_data_->cv_st.wait_for(lock, milliseconds(ms), [
this] {
return !status_; });
135 void FiberSocketImpl::ClientWorker() {
138 VSOCK(1) <<
"Status " << status_;
139 error_code ec = Reconnect(hname_, port_);
140 VSOCK(1) <<
"After Reconnect: " << ec <<
"/" << ec.message() <<
" is_open: " << is_open_;
141 if (ec && is_open_) {
142 this_fiber::sleep_for(10ms);
146 DCHECK(sock_.non_blocking());
149 VSOCK(3) <<
"BeforeAsyncWait";
150 sock_.async_wait(socket_t::wait_read, fibers_ext::yield[ec]);
152 LOG_IF(ERROR, is_open_) <<
"AsyncWait: " << ec.message();
156 size_t read_capacity = rbuf_size_ - rslice_.size();
157 if (read_state_ == READ_IDLE && read_capacity) {
158 uint8_t* next = static_cast<uint8_t*>(rslice_.data()) + rslice_.size();
162 size_t read_cnt = sock_.receive(asio::mutable_buffer(next, read_capacity), 0, status_);
164 rslice_ = asio::mutable_buffer(rslice_.data(), rslice_.size() + read_cnt);
165 }
else if (status_ == system::errc::resource_unavailable_try_again) {
168 VSOCK(1) <<
"Receive: " << status_.message() <<
", read_cnt " << 0;
175 VLOG(3) <<
"BeforeCvReadWait";
176 auto should_iterate = [
this] {
177 return !is_open() || (read_state_ == READ_IDLE && rslice_.size() != rbuf_size_);
180 clientsock_data_->worker_ev.await(should_iterate);
182 VLOG(3) <<
"WorkerIteration: ";
184 VLOG(1) <<
"FiberSocketReadExit";
187 ABSL_MUST_USE_RESULT system::error_code OpenAndConfigure(
bool keep_alive,
188 asio::ip::tcp::socket& sock) {
189 using namespace asio::ip;
190 system::error_code ec;
192 if (sock.is_open()) {
194 CHECK(!sock.is_open());
196 sock.open(tcp::v4(), ec);
199 CHECK(sock.is_open());
202 socket_t::reuse_address opt(
true);
203 sock.set_option(opt, ec);
206 socket_t::keep_alive opt2(keep_alive);
207 sock.set_option(opt2, ec);
211 sock.non_blocking(
true, ec);
214 return system::error_code{};
217 system::error_code FiberSocketImpl::Reconnect(
const std::string& hname,
218 const std::string& service) {
219 DCHECK(clientsock_data_);
220 using namespace asio::ip;
222 auto& asio_io_cntx = clientsock_data_->io_cntx->raw_context();
224 tcp::resolver resolver(asio_io_cntx);
226 system::error_code ec;
228 VSOCK(1) <<
"Before AsyncResolve " << is_open();
231 auto results = resolver.async_resolve(tcp::v4(), hname, service, fibers_ext::yield[ec]);
233 VLOG(1) <<
"Resolver error " << ec;
236 DVSOCK(1) <<
"After AsyncResolve, got " << results.size() <<
" results";
238 asio::steady_timer timer(asio_io_cntx, clientsock_data_->connect_duration);
239 timer.async_wait([&](
const system::error_code& ec) {
241 VSOCK(1) <<
"Cancelling after " << detail::ms_duration(clientsock_data_->connect_duration)
249 size_t result_index = 0;
250 for (
const auto& remote_dest : results) {
251 ec = OpenAndConfigure(keep_alive_, sock_);
254 sock_.async_connect(remote_dest, fibers_ext::yield[ec]);
259 if (!ec || ec == asio::error::operation_aborted)
261 VSOCK(2) <<
"Connect iteration " << result_index <<
", error " << ec;
264 VSOCK(1) <<
"After async_connect " << ec <<
" result index " << result_index;
267 SetStatus(ec,
"reconnect");
269 CHECK(sock_.non_blocking()) << native_handle();
271 socket_t::keep_alive ka_opt;
272 sock_.get_option(ka_opt, ec);
273 CHECK_EQ(keep_alive_, ka_opt.value()) << keep_alive_;
276 std::lock_guard<fibers::mutex> lock(clientsock_data_->connect_mu);
281 clientsock_data_->cv_st.notify_one();
286 IoContext& FiberSocketImpl::context() {
287 CHECK(clientsock_data_);
288 return *clientsock_data_->io_cntx;